Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewrite tests to use new KIP-470 API #301

Merged
merged 3 commits into from
Oct 30, 2019
Merged

Rewrite tests to use new KIP-470 API #301

merged 3 commits into from
Oct 30, 2019

Conversation

mjsax
Copy link
Member

@mjsax mjsax commented Oct 29, 2019

No description provided.

@mjsax mjsax requested a review from a team October 29, 2019 04:38
@@ -84,7 +84,7 @@ private KafkaStreams processStreams(final String bootstrapServers, final String
final GlobalKTable<Long, Customer> customers = builder.globalTable(CUSTOMERS.name(),
Consumed.with(CUSTOMERS.keySerde(), CUSTOMERS.valueSerde()));

final Joined<String, Order, Payment> serdes = Joined
final StreamJoined<String, Order, Payment> serdes = StreamJoined
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KIP-479 cleanup

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the KIP-479 cleanups!

@@ -135,7 +135,7 @@ public void stop() {
void sendEmail(EmailTuple details);
}

public class EmailTuple {
public static class EmailTuple {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

side cleanup

@@ -47,13 +47,13 @@
ORDERS.valueSerde());
private final Grouped<String, OrderValidation> serdes3 = Grouped
.with(ORDER_VALIDATIONS.keySerde(), ORDER_VALIDATIONS.valueSerde());
private final Joined<String, Long, Order> serdes4 = Joined
private final StreamJoined<String, Long, Order> serdes4 = StreamJoined
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KIP-479 cleanup

@@ -72,7 +70,7 @@ public void shouldRoundTripGenericAvroDataThroughKafka() throws Exception {
record.put("user", "alice");
record.put("is_new", true);
record.put("content", "lorem ipsum");
final List<GenericRecord> inputValues = Collections.singletonList(record);
final List<Object> inputValues = Collections.singletonList(record);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new KIP-470 API cannot resolve GenericRecord as type, because KafkaAvroSerializer uses Object...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Huh, that's a bummer.

* @param valueSerializer The {@link Serializer} corresponding to the value type
* @param <V> Value type of the data records
*/
static <K, V> void produceValuesSynchronously(final String topic,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the new KIP-470 API, we don't need those helpers any longer.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yay!

@@ -58,29 +59,26 @@ public void shouldUppercaseTheInput() {
final String inputTopic = "inputTopic";
final String outputTopic = "outputTopic";
final KStream<byte[], String> input = builder.stream(inputTopic);
final KStream<byte[], String> uppercased = input.mapValues(s -> s.toUpperCase());
final KStream<byte[], String> uppercased = input.mapValues((ValueMapper<String, String>) String::toUpperCase);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

side cleanup

MockSchemaRegistry.dropScope(SCHEMA_REGISTRY_SCOPE);
}

@Test
public void shouldCountPlayEventsBySession() throws Exception {
public void shouldCountPlayEventsBySession() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

side cleanup -- this method does not throw

.keyValueStoreBuilder(
Stores.persistentKeyValueStore("WordCountsStore"),
Serdes.String(),
Serdes.Long())
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix indentation only

@@ -99,7 +99,7 @@ public void shouldJoinTwoStreams() {
// In this specific example, we don't need to define join serdes explicitly because the key, left value, and
// right value are all of type String, which matches our default serdes configured for the application. However,
// we want to showcase the use of `Joined.with(...)` in case your code needs a different type setup.
Joined.with(
StreamJoined.with(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KIP-479 cleanup

// Compute the total per region by summing the individual click counts per region.
.groupByKey(Grouped.with(stringSerde, longSerde))
.reduce(Long::sum);
// Join the stream against the table.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix indentation only

(regionValue, lastLoginValue) -> regionValue + "/" + lastLoginValue,
Materialized.as(storeName))
(regionValue, lastLoginValue) -> regionValue + "/" + lastLoginValue,
Materialized.as(storeName))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix indentation only

*
* @return ProducerRecord containing word as key and count as value
*/
private ProducerRecord<String, Long> readOutput() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With KIP-470, we don't need those helpers any longer

@@ -61,220 +61,243 @@

@Test
public void shouldSumNumbersOnSameDay() {
final List<MyEvent> inputValues = Arrays.asList(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replacing MyEvent with new KIP-470 `TestRecord:

  • requires to pass in null key
  • required to pass timestamp as Instant

new MyEvent(7, ZonedDateTime.of(2019, 1, 1, 16, 31, 0, 0, zone)),
dummyEventToForceSuppression()
);
final List<ExpectedResult> expectedValues = Arrays.asList(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replacing ExpectedResult with simple KeyValue pair

);
verify(inputValues, expectedValues, zone);
final List<KeyValue<Windowed<Integer>, Integer>> expectedValues =
Collections.singletonList(KeyValue.pair(toWindowed(1,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using toWindowed to get Windowed<K> for the key passing in key, start/end ts


}

private class MyEvent {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MyEvent replace with TestRecord from KIP-470

}
}

private class ExpectedResult {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ExpectedResult replace with KeyValue pair

}
}

private void verifyResults(final List<ExpectedResult> expectedValues, final TopologyTestDriver testDriver) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not needed any longer -- using KIP-470 API now

}

private void injectFakeData(final List<MyEvent> inputValues, final TopologyTestDriver testDriver) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not needed any longer -- using KIP-470 API now

@@ -85,25 +86,28 @@ object IntegrationTestScalaUtils {
def produceValuesSynchronously[V](topic: String, values: Seq[V], driver: TopologyTestDriver)
(implicit valueSerializer: Serializer[V]): Unit = {
import collection.JavaConverters._
IntegrationTestUtils.produceValuesSynchronously(topic, values.asJava, driver, valueSerializer)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we removed those helper, we call the TopologyTestDriver method directly -- it's still useful to keep the Scala helpers for explicit Serde support

Copy link
Member

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow, thanks @mjsax ! I had one high-level suggestion, and a few smaller comments.
-John

testDriver.createInputTopic(inputTopic,
new IntegrationTestUtils.NothingSerde<>(),
new StringSerializer())
.pipeValueList(inputValues);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tentatively, I think it might set a better example for users if we establish a pattern like:

try (testDriver = new TopologyTestDriver(...)) {
  final TestInputTopic<...> inputTopic = testDriver.createInputTopic(...);
  final TestOutputTopic<...> outputTopic = testDriver.createOutputTopic(...);

  inputTopic.pipeValueList(inputValues);
  assertThat(outputTopic.readKeyValuesToMap()).isEqualTo(expectedOutput);
}

In other words, I feel it leads to more readable tests if we declare the inputs and outputs right up front, and then push data through and assert the results later on.

@@ -72,7 +70,7 @@ public void shouldRoundTripGenericAvroDataThroughKafka() throws Exception {
record.put("user", "alice");
record.put("is_new", true);
record.put("content", "lorem ipsum");
final List<GenericRecord> inputValues = Collections.singletonList(record);
final List<Object> inputValues = Collections.singletonList(record);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Huh, that's a bummer.

topologyTestDriver.createInputTopic(inputTopic,
new IntegrationTestUtils.NothingSerde<>(),
new LongSerializer())
.pipeValueList(inputValues);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like declaring the same input topic twice may disguise the intent of this test. Do you think declaring it once with a ByteArray value serializer, and then piping in invalid and valid data arrays would be clearer?

* @param valueSerializer The {@link Serializer} corresponding to the value type
* @param <V> Value type of the data records
*/
static <K, V> void produceValuesSynchronously(final String topic,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yay!

playEventSerializer,
start
);
topologyTestDriver.createInputTopic(SessionWindowsExample.PLAY_EVENTS,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can also create the input and output topics in the @Before method so we can tidy up these tests.

streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
// The commit interval for flushing records to state stores and downstream must be lower than
// this integration test's timeout (30 secs) to ensure we observe the expected processing results.
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This config has no effect now, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. Seems to be a left over from moving from "EmbeddedKafka" to TDD.

Copy link
Contributor

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the clean-up @mjsax, just one minor comment but LGTM modulo @vvcephei comments.

@@ -84,7 +84,7 @@ private KafkaStreams processStreams(final String bootstrapServers, final String
final GlobalKTable<Long, Customer> customers = builder.globalTable(CUSTOMERS.name(),
Consumed.with(CUSTOMERS.keySerde(), CUSTOMERS.valueSerde()));

final Joined<String, Order, Payment> serdes = Joined
final StreamJoined<String, Order, Payment> serdes = StreamJoined
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the KIP-479 cleanups!

@@ -99,7 +99,7 @@ public void shouldJoinTwoStreams() {
// In this specific example, we don't need to define join serdes explicitly because the key, left value, and
// right value are all of type String, which matches our default serdes configured for the application. However,
// we want to showcase the use of `Joined.with(...)` in case your code needs a different type setup.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: update comment to use StreamJoined as well.

@mjsax
Copy link
Member Author

mjsax commented Oct 29, 2019

Updated this. (Added some side cleanup to use hamcrest for testing in all test)

Copy link
Member

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks so much, @mjsax ; this looks awesome!

@mjsax mjsax changed the base branch from master to 5.4.x October 30, 2019 20:23
@mjsax
Copy link
Member Author

mjsax commented Oct 30, 2019

Rebasing this to new branch 5.4.x and also resolve merge conflicts.

@mjsax mjsax merged commit 1f62f73 into 5.4.x Oct 30, 2019
@mjsax mjsax deleted the kip-470-rewrites branch October 30, 2019 22:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants