Skip to content

Rewrite tests to use new KIP-470 API #301

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Oct 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import org.apache.kafka.streams.KafkaStreams.State;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.StreamJoined;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -84,7 +84,7 @@ private KafkaStreams processStreams(final String bootstrapServers, final String
final GlobalKTable<Long, Customer> customers = builder.globalTable(CUSTOMERS.name(),
Consumed.with(CUSTOMERS.keySerde(), CUSTOMERS.valueSerde()));

final Joined<String, Order, Payment> serdes = Joined
final StreamJoined<String, Order, Payment> serdes = StreamJoined
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KIP-479 cleanup

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the KIP-479 cleanups!

.with(ORDERS.keySerde(), ORDERS.valueSerde(), PAYMENTS.valueSerde());

//Join the two streams and the table then send an email for each
Expand Down Expand Up @@ -135,7 +135,7 @@ interface Emailer {
void sendEmail(EmailTuple details);
}

public class EmailTuple {
public static class EmailTuple {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

side cleanup


public Order order;
public Payment payment;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.StreamJoined;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -47,13 +47,13 @@ public class ValidationsAggregatorService implements Service {
ORDERS.valueSerde());
private final Grouped<String, OrderValidation> serdes3 = Grouped
.with(ORDER_VALIDATIONS.keySerde(), ORDER_VALIDATIONS.valueSerde());
private final Joined<String, Long, Order> serdes4 = Joined
private final StreamJoined<String, Long, Order> serdes4 = StreamJoined
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KIP-479 cleanup

.with(ORDERS.keySerde(), Serdes.Long(), ORDERS.valueSerde());
private final Produced<String, Order> serdes5 = Produced
.with(ORDERS.keySerde(), ORDERS.valueSerde());
private final Grouped<String, Order> serdes6 = Grouped
.with(ORDERS.keySerde(), ORDERS.valueSerde());
private final Joined<String, OrderValidation, Order> serdes7 = Joined
private final StreamJoined<String, OrderValidation, Order> serdes7 = StreamJoined
.with(ORDERS.keySerde(), ORDER_VALIDATIONS.valueSerde(), ORDERS.valueSerde());

private KafkaStreams streams;
Expand Down
34 changes: 25 additions & 9 deletions src/test/java/io/confluent/examples/streams/AggregateTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
Expand All @@ -37,7 +39,8 @@
import java.util.Map;
import java.util.Properties;

import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;

/**
* How to aggregate messages via `groupBy()` and `aggregate()`.
Expand Down Expand Up @@ -69,18 +72,32 @@ public void shouldAggregate() {
expectedOutput.put("k", 5L);
expectedOutput.put("s", 22L);

// Step 1: Create the topology and its configuration
//
// Step 1: Create the topology and its configuration.
//
final StreamsBuilder builder = createTopology();
final Properties streamsConfiguration = createTopologyConfiguration();

try (final TopologyTestDriver testDriver = new TopologyTestDriver(builder.build(), streamsConfiguration)) {
// Step 2: Write the input
IntegrationTestUtils.produceValuesSynchronously(inputTopic, inputValues, testDriver, new StringSerializer());
//
// Step 2: Setup input and output topics.
//
final TestInputTopic<Void, String> input = testDriver
.createInputTopic(inputTopic,
new IntegrationTestUtils.NothingSerde<>(),
new StringSerializer());
final TestOutputTopic<String, Long> output = testDriver
.createOutputTopic(outputTopic, new StringDeserializer(), new LongDeserializer());

// Step 3: Validate the output
final Map<String, Long> actualOutput = IntegrationTestUtils.drainTableOutput(
outputTopic, testDriver, new StringDeserializer(), new LongDeserializer());
assertThat(actualOutput).isEqualTo(expectedOutput);
//
// Step 3: Write the input.
//
input.pipeValueList(inputValues);

//
// Step 4: Validate the output.
//
assertThat(output.readKeyValuesToMap(), equalTo(expectedOutput));
}
}

Expand Down Expand Up @@ -110,5 +127,4 @@ private Properties createTopologyConfiguration() {
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
return streamsConfiguration;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@
import java.util.List;
import java.util.Properties;

import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;

public class ApplicationResetIntegrationTest {
@ClassRule
Expand Down Expand Up @@ -114,7 +115,7 @@ public void shouldReprocess() throws Exception {
outputTopic,
inputValues.size()
);
assertThat(result).isEqualTo(expectedResult);
assertThat(result, equalTo(expectedResult));

streams.close();
}
Expand Down Expand Up @@ -165,7 +166,7 @@ public void shouldReprocess() throws Exception {
outputTopic,
inputValues.size()
);
assertThat(resultRerun).isEqualTo(expectedResult);
assertThat(resultRerun, equalTo(expectedResult));

streams.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@
import io.confluent.examples.streams.utils.PairOfDoubleAndLongDeserializer;
import io.confluent.examples.streams.utils.PairSerde;

import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;

/**
* End-to-end integration test that demonstrates one way to implement a custom join operation.
Expand Down Expand Up @@ -265,7 +266,7 @@ public void shouldTriggerStreamTableJoinFromTable() throws Exception {
//
final List<KeyValue<String, Pair<Double, Long>>> actualRecords =
readOutputDataFromJoinedStream(expectedOutputRecords.size());
assertThat(actualRecords).containsExactlyElementsOf(expectedOutputRecords);
assertThat(actualRecords, equalTo(expectedOutputRecords));
}
}

Expand Down Expand Up @@ -321,7 +322,7 @@ private KeyValue<String, Pair<Double, Long>> sendFullJoinRecordOrWaitForTableSid
LOG.info("Table data available for key {}, sending fully populated join message {}", key, joinRecord);
return joinRecord;
} else {
LOG.info("Table data unavailable for key {}, sending the join result as null", key, key, value);
LOG.info("Table data unavailable for key {}, sending the join result as null", key);
return KeyValue.pair(key, new Pair<>(value, null));
}
}
Expand All @@ -333,12 +334,9 @@ private boolean withinAcceptableBounds(final Instant streamRecordTimestamp,
}

@Override
public void close() {
}

public void close() {}
};
}

}

private void writeInputDataToStream(final List<KeyValueWithTimestamp<String, Double>> inputStreamRecords)
Expand Down Expand Up @@ -376,5 +374,4 @@ private List<KeyValue<String, Pair<Double, Long>>> readOutputDataFromJoinedStrea
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, PairOfDoubleAndLongDeserializer.class);
return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, outputTopic, numExpectedRecords);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
Expand All @@ -38,9 +40,9 @@
import java.util.List;
import java.util.Properties;
import java.util.UUID;
import java.util.stream.Collectors;

import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;

/**
* End-to-end integration test that demonstrates how to remove duplicate records from an input
Expand Down Expand Up @@ -212,8 +214,8 @@ public void shouldRemoveDuplicatesFromTheInput() {
final String inputTopic = "inputTopic";
final String outputTopic = "outputTopic";

final KStream<byte[], String> input = builder.stream(inputTopic);
final KStream<byte[], String> deduplicated = input.transform(
final KStream<byte[], String> stream = builder.stream(inputTopic);
final KStream<byte[], String> deduplicated = stream.transform(
// In this example, we assume that the record value as-is represents a unique event ID by
// which we can perform de-duplication. If your records are different, adapt the extractor
// function as needed.
Expand All @@ -223,27 +225,26 @@ public void shouldRemoveDuplicatesFromTheInput() {

try (final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(builder.build(), streamsConfiguration)) {
//
// Step 2: Produce some input data to the input topic.
// Step 2: Setup input and output topics.
//
IntegrationTestUtils.produceKeyValuesSynchronously(
inputTopic,
inputValues.stream().map(v -> new KeyValue<>(null, v)).collect(Collectors.toList()),
topologyTestDriver,
new IntegrationTestUtils.NothingSerde<>(),
new StringSerializer()
);
final TestInputTopic<Void, String> input = topologyTestDriver
.createInputTopic(inputTopic,
new IntegrationTestUtils.NothingSerde<>(),
new StringSerializer());
final TestOutputTopic<Void, String> output = topologyTestDriver
.createOutputTopic(outputTopic,
new IntegrationTestUtils.NothingSerde<>(),
new StringDeserializer());

//
// Step 3: Verify the application's output data.
// Step 3: Produce some input data to the input topic.
//
final List<String> actualValues = IntegrationTestUtils.drainStreamOutput(
outputTopic,
topologyTestDriver,
new IntegrationTestUtils.NothingSerde<>(),
new StringDeserializer()
).stream().map(kv -> kv.value).collect(Collectors.toList());
assertThat(actualValues).containsExactlyElementsOf(expectedValues);
input.pipeValueList(inputValues);

//
// Step 4: Verify the application's output data.
//
assertThat(output.readValuesToList(), equalTo(expectedValues));
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.KStream;
import org.junit.Test;
Expand All @@ -30,7 +31,8 @@
import java.util.Properties;
import java.util.stream.Collectors;

import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;

/**
* End-to-end integration test that demonstrates "fan-out", using an embedded Kafka cluster.
Expand Down Expand Up @@ -82,38 +84,34 @@ public void shouldFanoutTheInput() {

try (final TopologyTestDriver topologyTestDriver = new TopologyTestDriver(builder.build(), streamsConfiguration)) {
//
// Step 2: Produce some input data to the input topic.
// Step 2: Setup input and output topics.
//
IntegrationTestUtils.produceKeyValuesSynchronously(
inputTopicA,
inputValues.stream().map(v -> new KeyValue<>(null, v)).collect(Collectors.toList()),
topologyTestDriver,
new IntegrationTestUtils.NothingSerde<>(),
new StringSerializer()
);
final TestInputTopic<Void, String> input = topologyTestDriver
.createInputTopic(inputTopicA,
new IntegrationTestUtils.NothingSerde<>(),
new StringSerializer());
final TestOutputTopic<Void, String> outputB = topologyTestDriver
.createOutputTopic(outputTopicB,
new IntegrationTestUtils.NothingSerde<>(),
new StringDeserializer());
final TestOutputTopic<Void, String> outputC = topologyTestDriver
.createOutputTopic(outputTopicC,
new IntegrationTestUtils.NothingSerde<>(),
new StringDeserializer());

//
// Step 3: Verify the application's output data.
// Step 3: Produce some input data to the input topic.
//
input.pipeValueList(inputValues);

// Verify output topic B
final List<String> actualValuesForB = IntegrationTestUtils.drainStreamOutput(
outputTopicB,
topologyTestDriver,
new IntegrationTestUtils.NothingSerde<>(),
new StringDeserializer()
).stream().map(kv -> kv.value).collect(Collectors.toList());
assertThat(actualValuesForB).isEqualTo(expectedValuesForB);
//
// Step 4: Verify the application's output data.
//

// Verify output topic B
assertThat(outputB.readValuesToList(), equalTo(expectedValuesForB));
// Verify output topic C
final List<String> actualValuesForC = IntegrationTestUtils.drainStreamOutput(
outputTopicC,
topologyTestDriver,
new IntegrationTestUtils.NothingSerde<>(),
new StringDeserializer()
).stream().map(kv -> kv.value).collect(Collectors.toList());
assertThat(actualValuesForC).isEqualTo(expectedValuesForC);
assertThat(outputC.readValuesToList(), equalTo(expectedValuesForC));
}
}

}
Loading