From e7569338f52b47345db086d94e7c781024a61d19 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Wed, 5 Jun 2019 18:07:48 -0700 Subject: [PATCH 1/4] Tighten wait condition [5.1.x] (#259) I made a pass on the rest of tests that have not moved to TopologyTestDriver, and they are all a bit cumbersome to convert, because 1) either rely on Avro SR embedded with CLUSTER, 2) or reply on existing XXXService which is not easily integrated with TopologyTestDriver. So what I did here is the following: Generally, many of the flaky tests is not because Streams client takes too long to rebalance, but because broker cluster is taking too longer to start-up, and create / delete topics, and hence unable to handle the JoinGroup Request yet. For example, most recent failures: https://jenkins.confluent.io/job/confluentinc/job/kafka-streams-examples/job/5.1.x/274/testReport/junit/io.confluent.examples.streams.microservices/ValidationsAggregatorServiceTest/shouldAggregateRuleSuccesses/ https://jenkins.confluent.io/job/confluentinc/job/kafka-streams-examples/job/5.2.x/182/testReport/junit/io.confluent.examples.streams.microservices/OrdersServiceTest/shouldGetOrderByIdWhenOnDifferentHost/ Falls into this scenario. So what I did is to tighten the wait condition on create / delete topics, which are usually done at the startup of test, by checking both the ZK path as well as broker's local caches. This also makes sure that broker has completed startup and completed the topic operation. Note previously, we do not check after deleting the topic, and hence when starting the streams app the broker may still be busy deleting the topic and hence cause time out issue. Another minor fix, is that for IQ tests, a lot of the time binding fails because of the random host. I've removed this from the logic but always use localhost instead. I think this does not lose generality of the test coverage. --- .../ApplicationResetIntegrationTest.java | 2 +- .../examples/streams/ExampleTestUtils.java | 10 ---- .../streams/GlobalKTablesExampleTest.java | 2 +- .../streams/JoinLambdaIntegrationTest.java | 22 -------- .../streams/SessionWindowsExampleTest.java | 2 +- .../streams/SpecificAvroIntegrationTest.java | 2 +- .../streams/TopArticlesLambdaExampleTest.java | 2 +- .../streams/WikipediaFeedAvroExampleTest.java | 2 +- .../WikipediaFeedAvroLambdaExampleTest.java | 2 +- ...ordCountInteractiveQueriesExampleTest.java | 3 +- .../kafkamusic/KafkaMusicExampleTest.java | 2 +- .../kafka/EmbeddedSingleNodeKafkaCluster.java | 50 +++++++++++++++++-- .../streams/microservices/EndToEndTest.java | 11 +++- .../microservices/InventoryServiceTest.java | 2 +- .../ValidationsAggregatorServiceTest.java | 2 +- 15 files changed, 66 insertions(+), 50 deletions(-) delete mode 100644 src/test/java/io/confluent/examples/streams/JoinLambdaIntegrationTest.java diff --git a/src/test/java/io/confluent/examples/streams/ApplicationResetIntegrationTest.java b/src/test/java/io/confluent/examples/streams/ApplicationResetIntegrationTest.java index dbc51ea8a4..25b0826b6b 100644 --- a/src/test/java/io/confluent/examples/streams/ApplicationResetIntegrationTest.java +++ b/src/test/java/io/confluent/examples/streams/ApplicationResetIntegrationTest.java @@ -52,7 +52,7 @@ public class ApplicationResetIntegrationTest { private static final String outputTopic = "my-output-topic"; @BeforeClass - public static void startKafkaCluster() { + public static void startKafkaCluster() throws InterruptedException { CLUSTER.createTopic(inputTopic); CLUSTER.createTopic(outputTopic); } diff --git a/src/test/java/io/confluent/examples/streams/ExampleTestUtils.java b/src/test/java/io/confluent/examples/streams/ExampleTestUtils.java index 1bc54bcddb..322672f8ba 100644 --- a/src/test/java/io/confluent/examples/streams/ExampleTestUtils.java +++ b/src/test/java/io/confluent/examples/streams/ExampleTestUtils.java @@ -12,14 +12,4 @@ public static int randomFreeLocalPort() throws IOException { s.close(); return port; } - - public static String randomValidHost() { - final Random r = new Random(); - - if (r.nextFloat() < 0.1) { - return "localhost"; - } else { - return "127." + r.nextInt(10) + "." + r.nextInt(10) + "." + r.nextInt(256); - } - } } diff --git a/src/test/java/io/confluent/examples/streams/GlobalKTablesExampleTest.java b/src/test/java/io/confluent/examples/streams/GlobalKTablesExampleTest.java index 7d54e0ae84..9d4b4b0118 100644 --- a/src/test/java/io/confluent/examples/streams/GlobalKTablesExampleTest.java +++ b/src/test/java/io/confluent/examples/streams/GlobalKTablesExampleTest.java @@ -56,7 +56,7 @@ public class GlobalKTablesExampleTest { private KafkaStreams streamInstanceTwo; @BeforeClass - public static void createTopics() { + public static void createTopics() throws InterruptedException { CLUSTER.createTopic(ORDER_TOPIC, 4, (short) 1); CLUSTER.createTopic(CUSTOMER_TOPIC, 3, (short) 1); CLUSTER.createTopic(PRODUCT_TOPIC, 2, (short) 1); diff --git a/src/test/java/io/confluent/examples/streams/JoinLambdaIntegrationTest.java b/src/test/java/io/confluent/examples/streams/JoinLambdaIntegrationTest.java deleted file mode 100644 index 07753f3ba6..0000000000 --- a/src/test/java/io/confluent/examples/streams/JoinLambdaIntegrationTest.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Copyright Confluent Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.confluent.examples.streams; - -/** - * JoinLambdaIntegrationTest was renamed to StreamToTableJoinIntegrationTest. - */ -public class JoinLambdaIntegrationTest { -} diff --git a/src/test/java/io/confluent/examples/streams/SessionWindowsExampleTest.java b/src/test/java/io/confluent/examples/streams/SessionWindowsExampleTest.java index 2b1dd0cba6..707674b8d6 100644 --- a/src/test/java/io/confluent/examples/streams/SessionWindowsExampleTest.java +++ b/src/test/java/io/confluent/examples/streams/SessionWindowsExampleTest.java @@ -55,7 +55,7 @@ public class SessionWindowsExampleTest { private KafkaStreams streams; @BeforeClass - public static void createTopics() { + public static void createTopics() throws InterruptedException { CLUSTER.createTopic(SessionWindowsExample.PLAY_EVENTS); CLUSTER.createTopic(SessionWindowsExample.PLAY_EVENTS_PER_SESSION); } diff --git a/src/test/java/io/confluent/examples/streams/SpecificAvroIntegrationTest.java b/src/test/java/io/confluent/examples/streams/SpecificAvroIntegrationTest.java index 6ac00857c4..627fca80b2 100644 --- a/src/test/java/io/confluent/examples/streams/SpecificAvroIntegrationTest.java +++ b/src/test/java/io/confluent/examples/streams/SpecificAvroIntegrationTest.java @@ -57,7 +57,7 @@ public class SpecificAvroIntegrationTest { private static final String outputTopic = "outputTopic"; @BeforeClass - public static void startKafkaCluster() { + public static void startKafkaCluster() throws InterruptedException { CLUSTER.createTopic(inputTopic); CLUSTER.createTopic(outputTopic); } diff --git a/src/test/java/io/confluent/examples/streams/TopArticlesLambdaExampleTest.java b/src/test/java/io/confluent/examples/streams/TopArticlesLambdaExampleTest.java index 2fa0b6e680..b9335c4116 100644 --- a/src/test/java/io/confluent/examples/streams/TopArticlesLambdaExampleTest.java +++ b/src/test/java/io/confluent/examples/streams/TopArticlesLambdaExampleTest.java @@ -62,7 +62,7 @@ public class TopArticlesLambdaExampleTest { private KafkaStreams streams; @BeforeClass - public static void createTopics() { + public static void createTopics() throws InterruptedException { CLUSTER.createTopic(TopArticlesLambdaExample.TOP_NEWS_PER_INDUSTRY_TOPIC); CLUSTER.createTopic(TopArticlesLambdaExample.PAGE_VIEWS); } diff --git a/src/test/java/io/confluent/examples/streams/WikipediaFeedAvroExampleTest.java b/src/test/java/io/confluent/examples/streams/WikipediaFeedAvroExampleTest.java index 9a260b3989..6d788630df 100644 --- a/src/test/java/io/confluent/examples/streams/WikipediaFeedAvroExampleTest.java +++ b/src/test/java/io/confluent/examples/streams/WikipediaFeedAvroExampleTest.java @@ -52,7 +52,7 @@ public class WikipediaFeedAvroExampleTest { private KafkaStreams streams; @BeforeClass - public static void createTopics() { + public static void createTopics() throws InterruptedException { CLUSTER.createTopic(WikipediaFeedAvroExample.WIKIPEDIA_FEED); CLUSTER.createTopic(WikipediaFeedAvroExample.WIKIPEDIA_STATS); } diff --git a/src/test/java/io/confluent/examples/streams/WikipediaFeedAvroLambdaExampleTest.java b/src/test/java/io/confluent/examples/streams/WikipediaFeedAvroLambdaExampleTest.java index cbeb27a40e..5c5cc7bd84 100644 --- a/src/test/java/io/confluent/examples/streams/WikipediaFeedAvroLambdaExampleTest.java +++ b/src/test/java/io/confluent/examples/streams/WikipediaFeedAvroLambdaExampleTest.java @@ -56,7 +56,7 @@ public class WikipediaFeedAvroLambdaExampleTest { private KafkaStreams streams; @BeforeClass - public static void createTopics() { + public static void createTopics() throws InterruptedException { CLUSTER.createTopic(WikipediaFeedAvroExample.WIKIPEDIA_FEED); CLUSTER.createTopic(WikipediaFeedAvroExample.WIKIPEDIA_STATS); } diff --git a/src/test/java/io/confluent/examples/streams/interactivequeries/WordCountInteractiveQueriesExampleTest.java b/src/test/java/io/confluent/examples/streams/interactivequeries/WordCountInteractiveQueriesExampleTest.java index 184d731315..a601454a0c 100644 --- a/src/test/java/io/confluent/examples/streams/interactivequeries/WordCountInteractiveQueriesExampleTest.java +++ b/src/test/java/io/confluent/examples/streams/interactivequeries/WordCountInteractiveQueriesExampleTest.java @@ -51,6 +51,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import static io.confluent.examples.streams.interactivequeries.WordCountInteractiveQueriesExample.DEFAULT_HOST; import static io.confluent.examples.streams.microservices.util.MicroserviceTestUtils.getWithRetries; import static junit.framework.TestCase.assertTrue; import static junit.framework.TestCase.fail; @@ -124,7 +125,7 @@ public void shutdown() throws Exception { } @Test public void shouldDemonstrateInteractiveQueries() throws Exception { - final String host = ExampleTestUtils.randomValidHost(); + final String host = DEFAULT_HOST; final int port = ExampleTestUtils.randomFreeLocalPort(); final String baseUrl = "http://" + host + ":" + port + "/state"; diff --git a/src/test/java/io/confluent/examples/streams/interactivequeries/kafkamusic/KafkaMusicExampleTest.java b/src/test/java/io/confluent/examples/streams/interactivequeries/kafkamusic/KafkaMusicExampleTest.java index afcf5f72d0..50fd6bc58b 100644 --- a/src/test/java/io/confluent/examples/streams/interactivequeries/kafkamusic/KafkaMusicExampleTest.java +++ b/src/test/java/io/confluent/examples/streams/interactivequeries/kafkamusic/KafkaMusicExampleTest.java @@ -186,7 +186,7 @@ public void shutdown() { @Test public void shouldCreateChartsAndAccessThemViaInteractiveQueries() throws Exception { - final String host = ExampleTestUtils.randomValidHost(); + final String host = "localhost"; createStreams(host); streams.start(); diff --git a/src/test/java/io/confluent/examples/streams/kafka/EmbeddedSingleNodeKafkaCluster.java b/src/test/java/io/confluent/examples/streams/kafka/EmbeddedSingleNodeKafkaCluster.java index 92a369b861..624c5ac352 100644 --- a/src/test/java/io/confluent/examples/streams/kafka/EmbeddedSingleNodeKafkaCluster.java +++ b/src/test/java/io/confluent/examples/streams/kafka/EmbeddedSingleNodeKafkaCluster.java @@ -34,6 +34,7 @@ import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.stream.Collectors; /** * Runs an in-memory, "embedded" Kafka cluster with 1 ZooKeeper instance, 1 Kafka broker, and 1 @@ -189,7 +190,7 @@ public String schemaRegistryUrl() { * * @param topic The name of the topic. */ - public void createTopic(final String topic) { + public void createTopic(final String topic) throws InterruptedException { createTopic(topic, 1, (short) 1, Collections.emptyMap()); } @@ -200,7 +201,7 @@ public void createTopic(final String topic) { * @param partitions The number of partitions for this topic. * @param replication The replication factor for (the partitions of) this topic. */ - public void createTopic(final String topic, final int partitions, final short replication) { + public void createTopic(final String topic, final int partitions, final short replication) throws InterruptedException { createTopic(topic, partitions, replication, Collections.emptyMap()); } @@ -215,8 +216,28 @@ public void createTopic(final String topic, final int partitions, final short re public void createTopic(final String topic, final int partitions, final short replication, - final Map topicConfig) { + final Map topicConfig) throws InterruptedException { + createTopic(60000L, topic, partitions, replication, topicConfig); + } + + /** + * Creates a Kafka topic with the given parameters and blocks until all topics got created. + * + * @param topic The name of the topic. + * @param partitions The number of partitions for this topic. + * @param replication The replication factor for (partitions of) this topic. + * @param topicConfig Additional topic-level configuration settings. + */ + public void createTopic(final long timeoutMs, + final String topic, + final int partitions, + final short replication, + final Map topicConfig) throws InterruptedException { broker.createTopic(topic, partitions, replication, topicConfig); + + if (timeoutMs > 0) { + TestUtils.waitForCondition(new TopicCreatedCondition(topic), timeoutMs, "Topics not created after " + timeoutMs + " milli seconds."); + } } /** @@ -253,9 +274,28 @@ private TopicsDeletedCondition(final String... topics) { @Override public boolean conditionMet() { //TODO once KAFKA-6098 is fixed use AdminClient to verify topics have been deleted - final Set allTopics = new HashSet<>( + final Set allTopicsFromZk = new HashSet<>( JavaConverters.seqAsJavaListConverter(broker.kafkaServer().zkClient().getAllTopicsInCluster()).asJava()); - return !allTopics.removeAll(deletedTopics); + + final Set allTopicsFromBrokerCache = new HashSet<>( + JavaConverters.seqAsJavaListConverter(broker.kafkaServer().metadataCache().getAllTopics().toSeq()).asJava()); + + return !allTopicsFromZk.removeAll(deletedTopics) && !allTopicsFromBrokerCache.removeAll(deletedTopics); + } + } + + private final class TopicCreatedCondition implements TestCondition { + final String createdTopic; + + private TopicCreatedCondition(final String topic) { + createdTopic = topic; + } + + @Override + public boolean conditionMet() { + //TODO once KAFKA-6098 is fixed use AdminClient to verify topics have been deleted + return broker.kafkaServer().zkClient().getAllTopicsInCluster().contains(createdTopic) && + broker.kafkaServer().metadataCache().contains(createdTopic); } } diff --git a/src/test/java/io/confluent/examples/streams/microservices/EndToEndTest.java b/src/test/java/io/confluent/examples/streams/microservices/EndToEndTest.java index 969074511e..585655b25f 100644 --- a/src/test/java/io/confluent/examples/streams/microservices/EndToEndTest.java +++ b/src/test/java/io/confluent/examples/streams/microservices/EndToEndTest.java @@ -33,13 +33,13 @@ import static java.util.Arrays.asList; import static javax.ws.rs.core.MediaType.APPLICATION_JSON_TYPE; import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.junit.Assert.fail; public class EndToEndTest extends MicroserviceTestUtils { private static final Logger log = LoggerFactory.getLogger(EndToEndTest.class); private static final String HOST = "localhost"; private final List services = new ArrayList<>(); - private static int restPort; private OrderBean returnedBean; private long startTime; private Paths path; @@ -166,7 +166,14 @@ public void startEverythingElse() throws Exception { CLUSTER.start(); } - Topics.ALL.keySet().forEach(CLUSTER::createTopic); + Topics.ALL.keySet().forEach(topic -> { + try { + CLUSTER.createTopic(topic); + } catch (final InterruptedException e) { + fail("Cannot create topics in time"); + } + }); + Schemas.configureSerdesWithSchemaRegistryUrl(CLUSTER.schemaRegistryUrl()); services.add(new FraudService()); diff --git a/src/test/java/io/confluent/examples/streams/microservices/InventoryServiceTest.java b/src/test/java/io/confluent/examples/streams/microservices/InventoryServiceTest.java index e614c1f617..6027962902 100644 --- a/src/test/java/io/confluent/examples/streams/microservices/InventoryServiceTest.java +++ b/src/test/java/io/confluent/examples/streams/microservices/InventoryServiceTest.java @@ -38,7 +38,7 @@ public class InventoryServiceTest extends MicroserviceTestUtils { @BeforeClass - public static void startKafkaCluster() { + public static void startKafkaCluster() throws InterruptedException { CLUSTER.createTopic(Topics.ORDERS.name()); CLUSTER.createTopic(Topics.ORDER_VALIDATIONS.name()); Schemas.configureSerdesWithSchemaRegistryUrl(CLUSTER.schemaRegistryUrl()); diff --git a/src/test/java/io/confluent/examples/streams/microservices/ValidationsAggregatorServiceTest.java b/src/test/java/io/confluent/examples/streams/microservices/ValidationsAggregatorServiceTest.java index ae2c29d6fe..a036b80189 100644 --- a/src/test/java/io/confluent/examples/streams/microservices/ValidationsAggregatorServiceTest.java +++ b/src/test/java/io/confluent/examples/streams/microservices/ValidationsAggregatorServiceTest.java @@ -32,7 +32,7 @@ public class ValidationsAggregatorServiceTest extends MicroserviceTestUtils { @BeforeClass - public static void startKafkaCluster() { + public static void startKafkaCluster() throws InterruptedException { CLUSTER.createTopic(Topics.ORDERS.name()); CLUSTER.createTopic(Topics.ORDER_VALIDATIONS.name()); Schemas.configureSerdesWithSchemaRegistryUrl(CLUSTER.schemaRegistryUrl()); From 8d24071f668214eab58e0085cd5656eea2f127bf Mon Sep 17 00:00:00 2001 From: Mohinish Date: Wed, 5 Jun 2019 18:28:15 -0700 Subject: [PATCH 2/4] Bump Confluent to 5.1.4-SNAPSHOT, Kafka to 2.1.2-SNAPSHOT --- docker-compose.yml | 12 ++++++------ pom.xml | 4 ++-- .../examples/streams/microservices/README.md | 2 +- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index f3b2e8d3aa..d311583028 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -2,7 +2,7 @@ version: '2' services: zookeeper: - image: confluentinc/cp-zookeeper:5.1.3-SNAPSHOT + image: confluentinc/cp-zookeeper:5.1.4-SNAPSHOT hostname: zookeeper ports: - '32181:32181' @@ -13,7 +13,7 @@ services: - "moby:127.0.0.1" kafka: - image: confluentinc/cp-enterprise-kafka:5.1.3-SNAPSHOT + image: confluentinc/cp-enterprise-kafka:5.1.4-SNAPSHOT hostname: kafka ports: - '9092:9092' @@ -38,7 +38,7 @@ services: - "moby:127.0.0.1" schema-registry: - image: confluentinc/cp-schema-registry:5.1.3-SNAPSHOT + image: confluentinc/cp-schema-registry:5.1.4-SNAPSHOT hostname: schema-registry depends_on: - zookeeper @@ -54,7 +54,7 @@ services: # This "container" is a workaround to pre-create topics for the Kafka Music application # until we have a more elegant way to do that. kafka-create-topics: - image: confluentinc/cp-kafka:5.1.3-SNAPSHOT + image: confluentinc/cp-kafka:5.1.4-SNAPSHOT depends_on: - kafka hostname: kafka-create-topics @@ -78,7 +78,7 @@ services: # Continuously generates input data for the Kafka Music application. kafka-music-data-generator: - image: confluentinc/kafka-streams-examples:5.0.0 + image: confluentinc/kafka-streams-examples:5.1.4-SNAPSHOT hostname: kafka-music-data-generator depends_on: - kafka @@ -103,7 +103,7 @@ services: # Runs the Kafka Music application. kafka-music-application: - image: confluentinc/kafka-streams-examples:5.0.0 + image: confluentinc/kafka-streams-examples:5.1.4-SNAPSHOT hostname: kafka-music-application depends_on: - kafka diff --git a/pom.xml b/pom.xml index f23b8be3e9..37177e47df 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ io.confluent rest-utils-parent - 5.1.3-SNAPSHOT + 5.1.4-SNAPSHOT kafka-streams-examples @@ -54,7 +54,7 @@ false false 1.8 - 5.1.3-SNAPSHOT + 5.1.4-SNAPSHOT UTF-8 ${kafka.scala.version}.8 2.2.6 diff --git a/src/main/java/io/confluent/examples/streams/microservices/README.md b/src/main/java/io/confluent/examples/streams/microservices/README.md index b595ca1c92..f2aacec035 100644 --- a/src/main/java/io/confluent/examples/streams/microservices/README.md +++ b/src/main/java/io/confluent/examples/streams/microservices/README.md @@ -34,7 +34,7 @@ Then run the fully-working demo [end-to-end](https://docs.confluent.io/current/t It runs the ecosystem and all the microservices for you including Kafka Connect, Elasticsearch, KSQL and Control Center. To play with this ecosystem the simplest way is to run the tests and fiddle with the code (stand alone execution is only supported in branch 5.0.0+ so go there if you want stand alone or docker support). Each test boots a self-contained Kafka cluster so it's easy to play with different queries and configurations. -The best place to start is [EndToEndTest.java](https://github.com/confluentinc/kafka-streams-examples/blob/3.3.1-post/src/test/java/io/confluent/examples/streams/microservices/EndToEndTest.java) +The best place to start is [EndToEndTest.java](https://github.com/confluentinc/kafka-streams-examples/blob/5.1.4-SNAPSHOT-post/src/test/java/io/confluent/examples/streams/microservices/EndToEndTest.java) # Running the Examples: * Requires Java 1.8 From b55510b1112ae6570ee98bb6c4c8b7056e037894 Mon Sep 17 00:00:00 2001 From: John Roesler Date: Fri, 21 Jun 2019 11:17:21 -0500 Subject: [PATCH 3/4] Use upstream mock SR support (#260) * Remove the (hacky) mock schema registry serdes * demonstrate use of Schema Registry mock URLs (https://github.com/confluentinc/schema-registry/pull/1141) --- .../streams/SessionWindowsExample.java | 22 +-- .../kafkamusic/KafkaMusicExample.java | 30 ++-- .../streams/GenericAvroIntegrationTest.java | 34 ++-- .../streams/IntegrationTestUtils.java | 25 ++- .../streams/SessionWindowsExampleTest.java | 152 +++++++++--------- .../kafkamusic/KafkaMusicExampleTest.java | 40 +++-- .../utils/MockGenericAvroDeserializer.java | 42 ----- .../streams/utils/MockGenericAvroSerde.java | 46 ------ .../utils/MockGenericAvroSerializer.java | 42 ----- 9 files changed, 174 insertions(+), 259 deletions(-) delete mode 100644 src/test/java/io/confluent/examples/streams/utils/MockGenericAvroDeserializer.java delete mode 100644 src/test/java/io/confluent/examples/streams/utils/MockGenericAvroSerde.java delete mode 100644 src/test/java/io/confluent/examples/streams/utils/MockGenericAvroSerializer.java diff --git a/src/main/java/io/confluent/examples/streams/SessionWindowsExample.java b/src/main/java/io/confluent/examples/streams/SessionWindowsExample.java index f41cc655c4..df637aadd0 100644 --- a/src/main/java/io/confluent/examples/streams/SessionWindowsExample.java +++ b/src/main/java/io/confluent/examples/streams/SessionWindowsExample.java @@ -26,17 +26,19 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.Serialized; import org.apache.kafka.streams.kstream.SessionWindows; import org.apache.kafka.streams.state.SessionStore; -import java.util.Collections; import java.util.Map; import java.util.Properties; import java.util.concurrent.TimeUnit; +import static java.util.Collections.singletonMap; + /** * Demonstrates counting user activity (play-events) into Session Windows *

@@ -112,9 +114,10 @@ public class SessionWindowsExample { public static void main(final String[] args) { final String bootstrapServers = args.length > 0 ? args[0] : "localhost:9092"; final String schemaRegistryUrl = args.length > 1 ? args[1] : "http://localhost:8081"; - final KafkaStreams streams = createStreams(bootstrapServers, - schemaRegistryUrl, - "/tmp/kafka-streams"); + final KafkaStreams streams = new KafkaStreams( + buildTopology(singletonMap(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl)), + new StreamsConfig(streamsConfig(bootstrapServers, "/tmp/kafka-streams")) + ); // Always (and unconditionally) clean local state prior to starting the processing topology. // We opt for this unconditional call here because this will make it easier for you to play around with the example @@ -134,9 +137,7 @@ public static void main(final String[] args) { Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); } - static KafkaStreams createStreams(final String bootstrapServers, - final String schemaRegistryUrl, - final String stateDir) { + static Properties streamsConfig(final String bootstrapServers, final String stateDir) { final Properties config = new Properties(); // Give the Streams application a unique name. The name must be unique in the Kafka cluster // against which the application is run. @@ -150,11 +151,12 @@ static KafkaStreams createStreams(final String bootstrapServers, config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // disable caching to see session merging config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + return config; + } + static Topology buildTopology(final Map serdeConfig) { // create and configure the SpecificAvroSerdes required in this example final SpecificAvroSerde playEventSerde = new SpecificAvroSerde<>(); - final Map serdeConfig = Collections.singletonMap( - AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); playEventSerde.configure(serdeConfig, false); final StreamsBuilder builder = new StreamsBuilder(); @@ -174,7 +176,7 @@ static KafkaStreams createStreams(final String bootstrapServers, // write to play-events-per-session topic .to(PLAY_EVENTS_PER_SESSION, Produced.with(Serdes.String(), Serdes.Long())); - return new KafkaStreams(builder.build(), new StreamsConfig(config)); + return builder.build(); } } diff --git a/src/main/java/io/confluent/examples/streams/interactivequeries/kafkamusic/KafkaMusicExample.java b/src/main/java/io/confluent/examples/streams/interactivequeries/kafkamusic/KafkaMusicExample.java index 87a84e8c17..b338ff15d9 100644 --- a/src/main/java/io/confluent/examples/streams/interactivequeries/kafkamusic/KafkaMusicExample.java +++ b/src/main/java/io/confluent/examples/streams/interactivequeries/kafkamusic/KafkaMusicExample.java @@ -31,6 +31,7 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.Joined; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; @@ -44,13 +45,14 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; -import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Properties; import java.util.TreeSet; +import static java.util.Collections.singletonMap; + /** * Demonstrates how to locate and query state stores (Interactive Queries). * @@ -197,11 +199,10 @@ public static void main(final String[] args) throws Exception { System.out.println("Connecting to Confluent schema registry at " + schemaRegistryUrl); System.out.println("REST endpoint at http://" + restEndpointHostname + ":" + restEndpointPort); - final KafkaStreams streams = createChartsStreams(bootstrapServers, - schemaRegistryUrl, - restEndpointPort, - "/tmp/kafka-streams", - restEndpointHostname); + final KafkaStreams streams = new KafkaStreams( + buildTopology(singletonMap(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl)), + streamsConfig(bootstrapServers, restEndpointPort, "/tmp/kafka-streams", restEndpointHostname) + ); // Always (and unconditionally) clean local state prior to starting the processing topology. // We opt for this unconditional call here because this will make it easier for you to play around with the example @@ -242,11 +243,10 @@ static MusicPlaysRestService startRestProxy(final KafkaStreams streams, final Ho return interactiveQueriesRestService; } - static KafkaStreams createChartsStreams(final String bootstrapServers, - final String schemaRegistryUrl, - final int applicationServerPort, - final String stateDir, - final String host) { + static Properties streamsConfig(final String bootstrapServers, + final int applicationServerPort, + final String stateDir, + final String host) { final Properties streamsConfiguration = new Properties(); // Give the Streams application a unique name. The name must be unique in the Kafka cluster // against which the application is run. @@ -278,10 +278,11 @@ static KafkaStreams createChartsStreams(final String bootstrapServers, } catch (final NumberFormatException ignored) { } } + return streamsConfiguration; + } + static Topology buildTopology(final Map serdeConfig) { // create and configure the SpecificAvroSerdes required in this example - final Map serdeConfig = Collections.singletonMap( - AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); final SpecificAvroSerde playEventSerde = new SpecificAvroSerde<>(); playEventSerde.configure(serdeConfig, false); @@ -376,8 +377,7 @@ static KafkaStreams createChartsStreams(final String bootstrapServers, .withValueSerde(topFiveSerde) ); - return new KafkaStreams(builder.build(), streamsConfiguration); - + return builder.build(); } /** diff --git a/src/test/java/io/confluent/examples/streams/GenericAvroIntegrationTest.java b/src/test/java/io/confluent/examples/streams/GenericAvroIntegrationTest.java index 205040fb54..9a50a5ce6e 100644 --- a/src/test/java/io/confluent/examples/streams/GenericAvroIntegrationTest.java +++ b/src/test/java/io/confluent/examples/streams/GenericAvroIntegrationTest.java @@ -15,8 +15,8 @@ */ package io.confluent.examples.streams; -import io.confluent.examples.streams.utils.MockGenericAvroSerde; -import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.schemaregistry.testutil.MockSchemaRegistry; import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; import io.confluent.kafka.serializers.KafkaAvroDeserializer; import io.confluent.kafka.serializers.KafkaAvroSerializer; @@ -40,8 +40,6 @@ import java.util.Properties; import java.util.stream.Collectors; -import static io.confluent.examples.streams.IntegrationTestUtils.mkEntry; -import static io.confluent.examples.streams.IntegrationTestUtils.mkMap; import static org.assertj.core.api.Assertions.assertThat; /** @@ -51,18 +49,24 @@ */ public class GenericAvroIntegrationTest { + // A mocked schema registry for our serdes to use + private static final String SCHEMA_REGISTRY_SCOPE = GenericAvroIntegrationTest.class.getName(); + private static final String MOCK_SCHEMA_REGISTRY_URL = "mock://" + SCHEMA_REGISTRY_SCOPE; + + private static String inputTopic = "inputTopic"; private static String outputTopic = "outputTopic"; @Test public void shouldRoundTripGenericAvroDataThroughKafka() throws Exception { - final MockSchemaRegistryClient mockSchemaRegistryClient = new MockSchemaRegistryClient(); final Schema schema = new Schema.Parser().parse( getClass().getResourceAsStream("/avro/io/confluent/examples/streams/wikifeed.avsc") ); - mockSchemaRegistryClient.register("inputTopic-value", schema); + final SchemaRegistryClient schemaRegistryClient = MockSchemaRegistry.getClientForScope(SCHEMA_REGISTRY_SCOPE); + + schemaRegistryClient.register("inputTopic-value", schema); final GenericRecord record = new GenericData.Record(schema); record.put("user", "alice"); @@ -79,9 +83,8 @@ public void shouldRoundTripGenericAvroDataThroughKafka() throws Exception { streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "generic-avro-integration-test"); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy config"); streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName()); - streamsConfiguration.put("mock.schema.registry.client", mockSchemaRegistryClient); - streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, MockGenericAvroSerde.class); - streamsConfiguration.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "dummy config"); + streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class); + streamsConfiguration.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, MOCK_SCHEMA_REGISTRY_URL); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // Write the input data as-is to the output topic. @@ -95,12 +98,12 @@ public void shouldRoundTripGenericAvroDataThroughKafka() throws Exception { // However, in the code below we intentionally override the default serdes in `to()` to // demonstrate how you can construct and configure a generic Avro serde manually. final Serde stringSerde = Serdes.String(); - final Serde genericAvroSerde = new MockGenericAvroSerde(mockSchemaRegistryClient); - // Note how we must manually call `configure()` on this serde to configure the schema registry - // url. This is different from the case of setting default serdes (see `streamsConfiguration` + final Serde genericAvroSerde = new GenericAvroSerde(); + // Note how we must manually call `configure()` on this serde to configure the schema registry. + // This is different from the case of setting default serdes (see `streamsConfiguration` // above), which will be auto-configured based on the `StreamsConfiguration` instance. genericAvroSerde.configure( - Collections.singletonMap(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "dummy config"), + Collections.singletonMap(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, MOCK_SCHEMA_REGISTRY_URL), /*isKey*/ false); final KStream stream = builder.stream(inputTopic); stream.to(outputTopic, Produced.with(stringSerde, genericAvroSerde)); @@ -118,7 +121,7 @@ public void shouldRoundTripGenericAvroDataThroughKafka() throws Exception { inputValues.stream().map(v -> new KeyValue<>(null, (Object) v)).collect(Collectors.toList()), topologyTestDriver, new IntegrationTestUtils.NothingSerde<>(), - new KafkaAvroSerializer(mockSchemaRegistryClient) + new KafkaAvroSerializer(schemaRegistryClient) ); // @@ -128,11 +131,12 @@ public void shouldRoundTripGenericAvroDataThroughKafka() throws Exception { outputTopic, topologyTestDriver, new IntegrationTestUtils.NothingSerde<>(), - new KafkaAvroDeserializer(mockSchemaRegistryClient) + new KafkaAvroDeserializer(schemaRegistryClient) ).stream().map(kv -> (GenericRecord) kv.value).collect(Collectors.toList()); assertThat(actualValues).isEqualTo(inputValues); } finally { topologyTestDriver.close(); + MockSchemaRegistry.dropScope(SCHEMA_REGISTRY_SCOPE); } } diff --git a/src/test/java/io/confluent/examples/streams/IntegrationTestUtils.java b/src/test/java/io/confluent/examples/streams/IntegrationTestUtils.java index 71d54be02d..b21604371e 100644 --- a/src/test/java/io/confluent/examples/streams/IntegrationTestUtils.java +++ b/src/test/java/io/confluent/examples/streams/IntegrationTestUtils.java @@ -365,12 +365,35 @@ static void produceKeyValuesSynchronously(final String topic, final TopologyTestDriver topologyTestDriver, final Serializer keySerializer, final Serializer valueSerializer) { + final int timestamp = 0; + produceKeyValuesSynchronously(topic, records, topologyTestDriver, keySerializer, valueSerializer, timestamp); + } + + /** + * Like {@link IntegrationTestUtils#produceKeyValuesSynchronously(String, Collection, Properties)}, except for use + * with TopologyTestDriver tests, rather than "native" Kafka broker tests. + * + * @param topic Kafka topic to write the data records to + * @param records Data records to write to Kafka + * @param topologyTestDriver The {@link TopologyTestDriver} to send the data records to + * @param keySerializer The {@link Serializer} corresponding to the key type + * @param valueSerializer The {@link Serializer} corresponding to the value type + * @param timestamp The timestamp to use for the produced records + * @param Key type of the data records + * @param Value type of the data records + */ + static void produceKeyValuesSynchronously(final String topic, + final List> records, + final TopologyTestDriver topologyTestDriver, + final Serializer keySerializer, + final Serializer valueSerializer, + final long timestamp) { for (final KeyValue entity : records) { final ConsumerRecord consumerRecord = new ConsumerRecord<>( topic, 0, 0, - 0, + timestamp, TimestampType.CREATE_TIME, ConsumerRecord.NULL_CHECKSUM, ConsumerRecord.NULL_SIZE, diff --git a/src/test/java/io/confluent/examples/streams/SessionWindowsExampleTest.java b/src/test/java/io/confluent/examples/streams/SessionWindowsExampleTest.java index eeb90ebfb6..926f1987c2 100644 --- a/src/test/java/io/confluent/examples/streams/SessionWindowsExampleTest.java +++ b/src/test/java/io/confluent/examples/streams/SessionWindowsExampleTest.java @@ -16,26 +16,21 @@ package io.confluent.examples.streams; import io.confluent.examples.streams.avro.PlayEvent; -import io.confluent.examples.streams.kafka.EmbeddedSingleNodeKafkaCluster; +import io.confluent.kafka.schemaregistry.testutil.MockSchemaRegistry; import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.SessionWindow; import org.apache.kafka.streams.state.KeyValueIterator; -import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.ReadOnlySessionStore; import org.apache.kafka.test.TestUtils; import org.junit.After; import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.ClassRule; import org.junit.Test; import java.util.ArrayList; @@ -43,73 +38,61 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Properties; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; public class SessionWindowsExampleTest { - @ClassRule - public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster(); - private KafkaStreams streams; + // A mocked schema registry for our serdes to use + private static final String SCHEMA_REGISTRY_SCOPE = SessionWindowsExampleTest.class.getName(); + private static final String MOCK_SCHEMA_REGISTRY_URL = "mock://" + SCHEMA_REGISTRY_SCOPE; - @BeforeClass - public static void createTopics() { - CLUSTER.createTopic(SessionWindowsExample.PLAY_EVENTS); - CLUSTER.createTopic(SessionWindowsExample.PLAY_EVENTS_PER_SESSION); - } + + private TopologyTestDriver streams; + private final Map AVRO_SERDE_CONFIG = Collections.singletonMap( + AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, MOCK_SCHEMA_REGISTRY_URL + ); @Before public void createStreams() { - streams = - SessionWindowsExample.createStreams(CLUSTER.bootstrapServers(), - CLUSTER.schemaRegistryUrl(), - TestUtils.tempDirectory().getPath()); - streams.start(); + streams = new TopologyTestDriver( + SessionWindowsExample.buildTopology(AVRO_SERDE_CONFIG), + SessionWindowsExample.streamsConfig("dummy", TestUtils.tempDirectory().getPath()) + ); } @After public void closeStreams() { streams.close(); + MockSchemaRegistry.dropScope(SCHEMA_REGISTRY_SCOPE); } @Test public void shouldCountPlayEventsBySession() throws Exception { - final Map serdeConfig = Collections.singletonMap( - AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, CLUSTER.schemaRegistryUrl()); final SpecificAvroSerializer playEventSerializer = new SpecificAvroSerializer<>(); - playEventSerializer.configure(serdeConfig, false); - - final Properties producerProperties = new Properties(); - producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - - final KafkaProducer playEventProducer = new KafkaProducer<>(producerProperties, - Serdes.String() .serializer(), - playEventSerializer); - - final Properties consumerProps = new Properties(); - consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "session-windows-consumer"); - consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Serdes.String().deserializer().getClass()); - consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Serdes.Long().deserializer().getClass()); - + playEventSerializer.configure(AVRO_SERDE_CONFIG, false); final long start = System.currentTimeMillis(); final String userId = "erica"; - playEventProducer.send(new ProducerRecord<>(SessionWindowsExample.PLAY_EVENTS, - null, - start, - userId, - new PlayEvent(1L, 10L))); - - final List> - firstSession = - IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerProps, - SessionWindowsExample.PLAY_EVENTS_PER_SESSION, - 1); + IntegrationTestUtils.produceKeyValuesSynchronously( + SessionWindowsExample.PLAY_EVENTS, + Collections.singletonList( + new KeyValue<>(userId, new PlayEvent(1L, 10L)) + ), + streams, + new StringSerializer(), + playEventSerializer, + start + ); + + final List> firstSession = IntegrationTestUtils.drainStreamOutput( + SessionWindowsExample.PLAY_EVENTS_PER_SESSION, + streams, + new StringDeserializer(), + new LongDeserializer() + ); // should have a session for erica with start and end time the same assertThat(firstSession.get(0), equalTo(KeyValue.pair(userId + "@" +start+"->"+start, 1L))); @@ -117,7 +100,7 @@ public void shouldCountPlayEventsBySession() throws Exception { // also look in the store to find the same session final ReadOnlySessionStore playEventsPerSession = - streams.store(SessionWindowsExample.PLAY_EVENTS_PER_SESSION, QueryableStoreTypes.sessionStore()); + streams.getSessionStore(SessionWindowsExample.PLAY_EVENTS_PER_SESSION); final KeyValue, Long> next = fetchSessionsFromLocalStore(userId, playEventsPerSession).get(0); assertThat(next.key, equalTo(new Windowed<>(userId, new SessionWindow(start, start)))); @@ -125,17 +108,24 @@ public void shouldCountPlayEventsBySession() throws Exception { // send another event that is after the inactivity gap, so we have 2 independent sessions final long secondSessionStart = start + SessionWindowsExample.INACTIVITY_GAP + 1; - playEventProducer.send(new ProducerRecord<>(SessionWindowsExample.PLAY_EVENTS, - null, - secondSessionStart, - userId, - new PlayEvent(2L, 10L))); - - final List> - secondSession = - IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerProps, - SessionWindowsExample.PLAY_EVENTS_PER_SESSION, - 1); + IntegrationTestUtils.produceKeyValuesSynchronously( + SessionWindowsExample.PLAY_EVENTS, + Collections.singletonList( + new KeyValue<>(userId, new PlayEvent(2L, 10L)) + ), + streams, + new StringSerializer(), + playEventSerializer, + secondSessionStart + ); + + final List> secondSession = IntegrationTestUtils.drainStreamOutput( + SessionWindowsExample.PLAY_EVENTS_PER_SESSION, + streams, + new StringDeserializer(), + new LongDeserializer() + ); + // should have created a new session assertThat(secondSession.get(0), equalTo(KeyValue.pair(userId + "@" + secondSessionStart + "->" + secondSessionStart, 1L))); @@ -147,20 +137,24 @@ public void shouldCountPlayEventsBySession() throws Exception { // create an event between the two sessions to demonstrate merging final long mergeTime = start + SessionWindowsExample.INACTIVITY_GAP / 2; - playEventProducer.send(new ProducerRecord<>(SessionWindowsExample.PLAY_EVENTS, - null, - mergeTime, - userId, - new PlayEvent(3L, 10L))); - - playEventProducer.close(); - - - final List> - merged = - IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerProps, - SessionWindowsExample.PLAY_EVENTS_PER_SESSION, - 3); + IntegrationTestUtils.produceKeyValuesSynchronously( + SessionWindowsExample.PLAY_EVENTS, + Collections.singletonList( + new KeyValue<>(userId, new PlayEvent(3L, 10L)) + ), + streams, + new StringSerializer(), + playEventSerializer, + mergeTime + ); + + + final List> merged = IntegrationTestUtils.drainStreamOutput( + SessionWindowsExample.PLAY_EVENTS_PER_SESSION, + streams, + new StringDeserializer(), + new LongDeserializer() + ); // should have merged all sessions into one and sent tombstones for the sessions that were // merged assertThat(merged, equalTo(Arrays.asList(KeyValue.pair(userId + "@" +start+"->"+start, null), diff --git a/src/test/java/io/confluent/examples/streams/interactivequeries/kafkamusic/KafkaMusicExampleTest.java b/src/test/java/io/confluent/examples/streams/interactivequeries/kafkamusic/KafkaMusicExampleTest.java index 7c0bcfa22d..fe27e926a3 100644 --- a/src/test/java/io/confluent/examples/streams/interactivequeries/kafkamusic/KafkaMusicExampleTest.java +++ b/src/test/java/io/confluent/examples/streams/interactivequeries/kafkamusic/KafkaMusicExampleTest.java @@ -19,6 +19,9 @@ import io.confluent.examples.streams.avro.Song; import io.confluent.examples.streams.kafka.EmbeddedSingleNodeKafkaCluster; import io.confluent.examples.streams.microservices.util.MicroserviceTestUtils; +import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.schemaregistry.testutil.MockSchemaRegistry; import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer; import org.apache.kafka.clients.producer.KafkaProducer; @@ -33,6 +36,7 @@ import org.apache.kafka.streams.state.StreamsMetadata; import org.apache.kafka.test.TestUtils; import org.junit.After; +import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; @@ -71,13 +75,18 @@ public class KafkaMusicExampleTest { @ClassRule public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster(); + + // A mocked schema registry for our serdes to use + private static final String SCHEMA_REGISTRY_SCOPE = KafkaMusicExample.class.getName(); + private static final String MOCK_SCHEMA_REGISTRY_URL = "mock://" + SCHEMA_REGISTRY_SCOPE; + private static final int MAX_WAIT_MS = 30000; private KafkaStreams streams; private MusicPlaysRestService restProxy; private int appServerPort; private static final List songs = new ArrayList<>(); private static final Logger log = LoggerFactory.getLogger(KafkaMusicExampleTest.class); - + @BeforeClass public static void createTopicsAndProduceDataToInputTopics() throws Exception { CLUSTER.createTopic(KafkaMusicExample.PLAY_EVENTS); @@ -108,10 +117,10 @@ public static void createTopicsAndProduceDataToInputTopics() throws Exception { // Produce sample data to the input topic before the tests starts. final Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - + final Map serdeConfig = Collections.singletonMap( - AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, CLUSTER.schemaRegistryUrl()); - + AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, MOCK_SCHEMA_REGISTRY_URL + ); final SpecificAvroSerializer playEventSerializer = new SpecificAvroSerializer<>(); playEventSerializer.configure(serdeConfig, false); @@ -152,13 +161,26 @@ public static void createTopicsAndProduceDataToInputTopics() throws Exception { playEventProducer.close(); } + @AfterClass + public static void cleanup() { + MockSchemaRegistry.dropScope(SCHEMA_REGISTRY_SCOPE); + } + private void createStreams(final String host) throws Exception { appServerPort = randomFreeLocalPort(); - streams = KafkaMusicExample.createChartsStreams(CLUSTER.bootstrapServers(), - CLUSTER.schemaRegistryUrl(), - appServerPort, - TestUtils.tempDirectory().getPath(), - host); + + final Map serdeConfig = Collections.singletonMap( + AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, MOCK_SCHEMA_REGISTRY_URL + ); + streams = new KafkaStreams( + KafkaMusicExample.buildTopology(serdeConfig), + KafkaMusicExample.streamsConfig( + CLUSTER.bootstrapServers(), + appServerPort, + TestUtils.tempDirectory().getPath(), + host + ) + ); int count = 0; final int maxTries = 3; while (count <= maxTries) { diff --git a/src/test/java/io/confluent/examples/streams/utils/MockGenericAvroDeserializer.java b/src/test/java/io/confluent/examples/streams/utils/MockGenericAvroDeserializer.java deleted file mode 100644 index 86e29e2520..0000000000 --- a/src/test/java/io/confluent/examples/streams/utils/MockGenericAvroDeserializer.java +++ /dev/null @@ -1,42 +0,0 @@ -package io.confluent.examples.streams.utils; - -import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient; -import io.confluent.kafka.serializers.KafkaAvroDeserializer; -import org.apache.avro.generic.GenericRecord; -import org.apache.kafka.common.serialization.Deserializer; - -import java.util.Map; - -public class MockGenericAvroDeserializer implements Deserializer { - private KafkaAvroDeserializer inner; - private final boolean constructedWithClient; - - MockGenericAvroDeserializer() { - constructedWithClient = false; - } - - MockGenericAvroDeserializer(final MockSchemaRegistryClient mockSchemaRegistryClient) { - inner = new KafkaAvroDeserializer(mockSchemaRegistryClient); - constructedWithClient = true; - } - - @Override - public void configure(final Map configs, final boolean isKey) { - if (!constructedWithClient) { - final MockSchemaRegistryClient mockSchemaRegistryClient = - (MockSchemaRegistryClient) configs.get("mock.schema.registry.client"); - inner = new KafkaAvroDeserializer(mockSchemaRegistryClient); - } - inner.configure(configs, isKey); - } - - @Override - public GenericRecord deserialize(final String topic, final byte[] data) { - return (GenericRecord) inner.deserialize(topic, data); - } - - @Override - public void close() { - inner.close(); - } -} diff --git a/src/test/java/io/confluent/examples/streams/utils/MockGenericAvroSerde.java b/src/test/java/io/confluent/examples/streams/utils/MockGenericAvroSerde.java deleted file mode 100644 index 7a349fe64a..0000000000 --- a/src/test/java/io/confluent/examples/streams/utils/MockGenericAvroSerde.java +++ /dev/null @@ -1,46 +0,0 @@ -package io.confluent.examples.streams.utils; - -import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient; -import org.apache.avro.generic.GenericRecord; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.common.serialization.Serializer; - -import java.util.Map; - -public class MockGenericAvroSerde implements Serde { - private final MockGenericAvroSerializer serializer; - private final MockGenericAvroDeserializer deserializer; - - public MockGenericAvroSerde() { - serializer = new MockGenericAvroSerializer(); - deserializer = new MockGenericAvroDeserializer(); - } - - public MockGenericAvroSerde(final MockSchemaRegistryClient mockSchemaRegistryClient) { - serializer = new MockGenericAvroSerializer(mockSchemaRegistryClient); - deserializer = new MockGenericAvroDeserializer(mockSchemaRegistryClient); - } - - @Override - public void configure(final Map configs, final boolean isKey) { - serializer.configure(configs, isKey); - deserializer.configure(configs, isKey); - } - - @Override - public void close() { - serializer.close(); - deserializer.close(); - } - - @Override - public Serializer serializer() { - return serializer; - } - - @Override - public Deserializer deserializer() { - return deserializer; - } -} diff --git a/src/test/java/io/confluent/examples/streams/utils/MockGenericAvroSerializer.java b/src/test/java/io/confluent/examples/streams/utils/MockGenericAvroSerializer.java deleted file mode 100644 index 68e0806d01..0000000000 --- a/src/test/java/io/confluent/examples/streams/utils/MockGenericAvroSerializer.java +++ /dev/null @@ -1,42 +0,0 @@ -package io.confluent.examples.streams.utils; - -import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient; -import io.confluent.kafka.serializers.KafkaAvroSerializer; -import org.apache.avro.generic.GenericRecord; -import org.apache.kafka.common.serialization.Serializer; - -import java.util.Map; - -public class MockGenericAvroSerializer implements Serializer { - private KafkaAvroSerializer inner; - private final boolean constructedWithClient; - - MockGenericAvroSerializer() { - constructedWithClient = false; - } - - MockGenericAvroSerializer(final MockSchemaRegistryClient mockSchemaRegistryClient) { - inner = new KafkaAvroSerializer(mockSchemaRegistryClient); - constructedWithClient = true; - } - - @Override - public void configure(final Map configs, final boolean isKey) { - if (!constructedWithClient) { - final MockSchemaRegistryClient mockSchemaRegistryClient = - (MockSchemaRegistryClient) configs.get("mock.schema.registry.client"); - inner = new KafkaAvroSerializer(mockSchemaRegistryClient); - } - inner.configure(configs, isKey); - } - - @Override - public byte[] serialize(final String topic, final GenericRecord data) { - return inner.serialize(topic, data); - } - - @Override - public void close() { - inner.close(); - } -} From cda972c2899a821139c8c37cd56ef8ead17bd836 Mon Sep 17 00:00:00 2001 From: Confluent Jenkins Bot Date: Tue, 31 Mar 2020 20:31:57 +0000 Subject: [PATCH 4/4] Set Confluent to 5.1.4, Kafka to 2.1.1-cp6. --- README.md | 6 +++--- docker-compose.yml | 14 +++++++------- pom.xml | 4 ++-- .../streams/AnomalyDetectionLambdaExample.java | 2 +- .../examples/streams/MapFunctionLambdaExample.java | 2 +- .../examples/streams/PageViewRegionExample.java | 4 ++-- .../streams/PageViewRegionExampleDriver.java | 2 +- .../streams/PageViewRegionLambdaExample.java | 4 ++-- .../streams/SecureKafkaStreamsExample.java | 2 +- .../examples/streams/SessionWindowsExample.java | 4 ++-- .../streams/SessionWindowsExampleDriver.java | 2 +- .../examples/streams/SumLambdaExample.java | 4 ++-- .../examples/streams/SumLambdaExampleDriver.java | 2 +- .../examples/streams/TopArticlesExampleDriver.java | 2 +- .../examples/streams/TopArticlesLambdaExample.java | 4 ++-- .../examples/streams/UserRegionLambdaExample.java | 2 +- .../examples/streams/WikipediaFeedAvroExample.java | 4 ++-- .../streams/WikipediaFeedAvroExampleDriver.java | 2 +- .../streams/WikipediaFeedAvroLambdaExample.java | 4 ++-- .../examples/streams/WordCountLambdaExample.java | 2 +- .../WordCountInteractiveQueriesDriver.java | 2 +- .../WordCountInteractiveQueriesExample.java | 4 ++-- .../kafkamusic/KafkaMusicExample.java | 4 ++-- .../kafkamusic/KafkaMusicExampleDriver.java | 2 +- .../examples/streams/microservices/README.md | 2 +- .../examples/streams/MapFunctionScalaExample.scala | 2 +- .../examples/streams/WordCountScalaExample.scala | 2 +- 27 files changed, 45 insertions(+), 45 deletions(-) diff --git a/README.md b/README.md index e57db7d529..365beeb2e9 100644 --- a/README.md +++ b/README.md @@ -271,7 +271,7 @@ Kafka Streams examples via: # $ mvn clean package -# >>> Creates target/kafka-streams-examples-5.1.3-SNAPSHOT-standalone.jar +# >>> Creates target/kafka-streams-examples-5.1.4-standalone.jar ``` You can now run the example applications as follows: @@ -279,7 +279,7 @@ You can now run the example applications as follows: ```shell # Run an example application from the standalone jar. # Here: `WordCountLambdaExample` -$ java -cp target/kafka-streams-examples-5.1.3-SNAPSHOT-standalone.jar \ +$ java -cp target/kafka-streams-examples-5.1.4-standalone.jar \ io.confluent.examples.streams.WordCountLambdaExample ``` @@ -295,7 +295,7 @@ If you want to turn on log4j while running your example application, you can edi ```shell # Run an example application from the standalone jar. # Here: `WordCountLambdaExample` -$ java -cp target/kafka-streams-examples-5.1.3-SNAPSHOT-standalone.jar \ +$ java -cp target/kafka-streams-examples-5.1.4-standalone.jar \ -Dlog4j.configuration=file:src/main/resources/log4j.properties \ io.confluent.examples.streams.WordCountLambdaExample ``` diff --git a/docker-compose.yml b/docker-compose.yml index d311583028..cb5c05a761 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -2,7 +2,7 @@ version: '2' services: zookeeper: - image: confluentinc/cp-zookeeper:5.1.4-SNAPSHOT + image: confluentinc/cp-zookeeper:5.1.4 hostname: zookeeper ports: - '32181:32181' @@ -13,7 +13,7 @@ services: - "moby:127.0.0.1" kafka: - image: confluentinc/cp-enterprise-kafka:5.1.4-SNAPSHOT + image: confluentinc/cp-enterprise-kafka:5.1.4 hostname: kafka ports: - '9092:9092' @@ -38,7 +38,7 @@ services: - "moby:127.0.0.1" schema-registry: - image: confluentinc/cp-schema-registry:5.1.4-SNAPSHOT + image: confluentinc/cp-schema-registry:5.1.4 hostname: schema-registry depends_on: - zookeeper @@ -54,7 +54,7 @@ services: # This "container" is a workaround to pre-create topics for the Kafka Music application # until we have a more elegant way to do that. kafka-create-topics: - image: confluentinc/cp-kafka:5.1.4-SNAPSHOT + image: confluentinc/cp-kafka:5.1.4 depends_on: - kafka hostname: kafka-create-topics @@ -78,7 +78,7 @@ services: # Continuously generates input data for the Kafka Music application. kafka-music-data-generator: - image: confluentinc/kafka-streams-examples:5.1.4-SNAPSHOT + image: confluentinc/kafka-streams-examples:5.1.4 hostname: kafka-music-data-generator depends_on: - kafka @@ -89,7 +89,7 @@ services: cub kafka-ready -b kafka:29092 1 20 && \ echo Waiting for Confluent Schema Registry to be ready... && \ cub sr-ready schema-registry 8081 20 && \ - java -cp /usr/share/java/kafka-streams-examples/kafka-streams-examples-5.0.0-standalone.jar \ + java -cp /usr/share/java/kafka-streams-examples/kafka-streams-examples-5.1.4-standalone.jar \ io.confluent.examples.streams.interactivequeries.kafkamusic.KafkaMusicExampleDriver \ kafka:29092 http://schema-registry:8081'" environment: @@ -103,7 +103,7 @@ services: # Runs the Kafka Music application. kafka-music-application: - image: confluentinc/kafka-streams-examples:5.1.4-SNAPSHOT + image: confluentinc/kafka-streams-examples:5.1.4 hostname: kafka-music-application depends_on: - kafka diff --git a/pom.xml b/pom.xml index 37177e47df..09d4f6d388 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ io.confluent rest-utils-parent - 5.1.4-SNAPSHOT + 5.1.4 kafka-streams-examples @@ -54,7 +54,7 @@ false false 1.8 - 5.1.4-SNAPSHOT + 5.1.4 UTF-8 ${kafka.scala.version}.8 2.2.6 diff --git a/src/main/java/io/confluent/examples/streams/AnomalyDetectionLambdaExample.java b/src/main/java/io/confluent/examples/streams/AnomalyDetectionLambdaExample.java index ee784fb800..9a1cd4cb21 100644 --- a/src/main/java/io/confluent/examples/streams/AnomalyDetectionLambdaExample.java +++ b/src/main/java/io/confluent/examples/streams/AnomalyDetectionLambdaExample.java @@ -58,7 +58,7 @@ * Once packaged you can then run: *

  * {@code
- * $ java -cp target/kafka-streams-examples-5.0.0-SNAPSHOT-standalone.jar io.confluent.examples.streams.AnomalyDetectionLambdaExample
+ * $ java -cp target/kafka-streams-examples-5.1.4-standalone.jar io.confluent.examples.streams.AnomalyDetectionLambdaExample
  * }
*

* 4) Write some input data to the source topic (e.g. via {@code kafka-console-producer}. The already diff --git a/src/main/java/io/confluent/examples/streams/MapFunctionLambdaExample.java b/src/main/java/io/confluent/examples/streams/MapFunctionLambdaExample.java index d3d6e1af5d..cc64ac6a74 100644 --- a/src/main/java/io/confluent/examples/streams/MapFunctionLambdaExample.java +++ b/src/main/java/io/confluent/examples/streams/MapFunctionLambdaExample.java @@ -60,7 +60,7 @@ * Once packaged you can then run: *

  * {@code
- * $ java -cp target/kafka-streams-examples-5.0.0-SNAPSHOT-standalone.jar io.confluent.examples.streams.MapFunctionLambdaExample
+ * $ java -cp target/kafka-streams-examples-5.1.4-standalone.jar io.confluent.examples.streams.MapFunctionLambdaExample
  * }
  * 
* 4) Write some input data to the source topic (e.g. via {@code kafka-console-producer}). The already diff --git a/src/main/java/io/confluent/examples/streams/PageViewRegionExample.java b/src/main/java/io/confluent/examples/streams/PageViewRegionExample.java index 022ee35392..3f6e666574 100644 --- a/src/main/java/io/confluent/examples/streams/PageViewRegionExample.java +++ b/src/main/java/io/confluent/examples/streams/PageViewRegionExample.java @@ -79,7 +79,7 @@ * Once packaged you can then run: *
  * {@code
- * $ java -cp target/kafka-streams-examples-5.0.0-SNAPSHOT-standalone.jar io.confluent.examples.streams.PageViewRegionLambdaExample
+ * $ java -cp target/kafka-streams-examples-5.1.4-standalone.jar io.confluent.examples.streams.PageViewRegionLambdaExample
  * }
  * 
* 4) Write some input data to the source topics (e.g. via {@link PageViewRegionExampleDriver}). The @@ -89,7 +89,7 @@ * {@code * # Here: Write input data using the example driver. Once the driver has stopped generating data, * # you can terminate it via `Ctrl-C`. - * $ java -cp target/kafka-streams-examples-5.0.0-SNAPSHOT-standalone.jar io.confluent.examples.streams.PageViewRegionExampleDriver + * $ java -cp target/kafka-streams-examples-5.1.4-standalone.jar io.confluent.examples.streams.PageViewRegionExampleDriver * } * * 5) Inspect the resulting data in the output topic, e.g. via {@code kafka-console-consumer}. diff --git a/src/main/java/io/confluent/examples/streams/PageViewRegionExampleDriver.java b/src/main/java/io/confluent/examples/streams/PageViewRegionExampleDriver.java index 7cc877dd4f..d8e979c599 100644 --- a/src/main/java/io/confluent/examples/streams/PageViewRegionExampleDriver.java +++ b/src/main/java/io/confluent/examples/streams/PageViewRegionExampleDriver.java @@ -49,7 +49,7 @@ * Once packaged you can then run: *
  * {@code
- * $ java -cp target/kafka-streams-examples-5.0.0-SNAPSHOT-standalone.jar io.confluent.examples.streams.PageViewRegionExampleDriver
+ * $ java -cp target/kafka-streams-examples-5.1.4-standalone.jar io.confluent.examples.streams.PageViewRegionExampleDriver
  * }
  * 
* You should terminate with {@code Ctrl-C}. diff --git a/src/main/java/io/confluent/examples/streams/PageViewRegionLambdaExample.java b/src/main/java/io/confluent/examples/streams/PageViewRegionLambdaExample.java index 64205293a5..4f9a915640 100644 --- a/src/main/java/io/confluent/examples/streams/PageViewRegionLambdaExample.java +++ b/src/main/java/io/confluent/examples/streams/PageViewRegionLambdaExample.java @@ -75,7 +75,7 @@ * Once packaged you can then run: *
  * {@code
- * $ java -cp target/kafka-streams-examples-5.0.0-SNAPSHOT-standalone.jar io.confluent.examples.streams.PageViewRegionLambdaExample
+ * $ java -cp target/kafka-streams-examples-5.1.4-standalone.jar io.confluent.examples.streams.PageViewRegionLambdaExample
  * }
  * 
* 4) Write some input data to the source topics (e.g. via {@link PageViewRegionExampleDriver}). @@ -85,7 +85,7 @@ * {@code * # Here: Write input data using the example driver. Once the driver has stopped generating data, * # you can terminate it via `Ctrl-C`. - * $ java -cp target/kafka-streams-examples-5.0.0-SNAPSHOT-standalone.jar io.confluent.examples.streams.PageViewRegionExampleDriver + * $ java -cp target/kafka-streams-examples-5.1.4-standalone.jar io.confluent.examples.streams.PageViewRegionExampleDriver * } * * 5) Inspect the resulting data in the output topic, e.g. via {@code kafka-console-consumer}. diff --git a/src/main/java/io/confluent/examples/streams/SecureKafkaStreamsExample.java b/src/main/java/io/confluent/examples/streams/SecureKafkaStreamsExample.java index c0869bd5e5..a904a8300a 100644 --- a/src/main/java/io/confluent/examples/streams/SecureKafkaStreamsExample.java +++ b/src/main/java/io/confluent/examples/streams/SecureKafkaStreamsExample.java @@ -100,7 +100,7 @@ * [vagrant@kafka ~]$ mvn clean -DskipTests=true package * * # Now we can start this example application - * [vagrant@kafka ~]$ java -cp target/kafka-streams-examples-5.0.0-SNAPSHOT-standalone.jar \ + * [vagrant@kafka ~]$ java -cp target/kafka-streams-examples-5.1.4-standalone.jar \ * io.confluent.examples.streams.SecureKafkaStreamsExample * } * diff --git a/src/main/java/io/confluent/examples/streams/SessionWindowsExample.java b/src/main/java/io/confluent/examples/streams/SessionWindowsExample.java index 0982e7092b..eb934390b7 100644 --- a/src/main/java/io/confluent/examples/streams/SessionWindowsExample.java +++ b/src/main/java/io/confluent/examples/streams/SessionWindowsExample.java @@ -70,7 +70,7 @@ * Once packaged you can then run: *
  * {@code
- * $ java -cp target/kafka-streams-examples-5.0.0-SNAPSHOT-standalone.jar io.confluent.examples.streams.SessionWindowsExample
+ * $ java -cp target/kafka-streams-examples-5.1.4-standalone.jar io.confluent.examples.streams.SessionWindowsExample
  * }
  * 
* 4) Write some input data to the source topics (e.g. via {@link SessionWindowsExampleDriver}). The @@ -80,7 +80,7 @@ * {@code * # Here: Write input data using the example driver. The driver will also consume, and print, the data from the output * topic. The driver will stop when it has received all output records - * $ java -cp target/kafka-streams-examples-5.0.0-SNAPSHOT-standalone.jar io.confluent.examples.streams.SessionWindowsExampleDriver + * $ java -cp target/kafka-streams-examples-5.1.4-standalone.jar io.confluent.examples.streams.SessionWindowsExampleDriver * } * * You should see output data similar to: diff --git a/src/main/java/io/confluent/examples/streams/SessionWindowsExampleDriver.java b/src/main/java/io/confluent/examples/streams/SessionWindowsExampleDriver.java index 2a9932f53d..e69b33fab5 100644 --- a/src/main/java/io/confluent/examples/streams/SessionWindowsExampleDriver.java +++ b/src/main/java/io/confluent/examples/streams/SessionWindowsExampleDriver.java @@ -42,7 +42,7 @@ * Once packaged you can then run: *
  * {@code
- * $ java -cp target/kafka-streams-examples-5.0.0-SNAPSHOT-standalone.jar io.confluent.examples.streams.SessionWindowsExampleDriver
+ * $ java -cp target/kafka-streams-examples-5.1.4-standalone.jar io.confluent.examples.streams.SessionWindowsExampleDriver
  * }
  * 
*/ diff --git a/src/main/java/io/confluent/examples/streams/SumLambdaExample.java b/src/main/java/io/confluent/examples/streams/SumLambdaExample.java index de25b3f8cf..5e9e8f117e 100644 --- a/src/main/java/io/confluent/examples/streams/SumLambdaExample.java +++ b/src/main/java/io/confluent/examples/streams/SumLambdaExample.java @@ -53,7 +53,7 @@ * Once packaged you can then run: *
  * {@code
- * $ java -cp target/kafka-streams-examples-5.0.0-SNAPSHOT-standalone.jar io.confluent.examples.streams.SumLambdaExample
+ * $ java -cp target/kafka-streams-examples-5.1.4-standalone.jar io.confluent.examples.streams.SumLambdaExample
  * }
  * 
* 4) Write some input data to the source topic (e.g. via {@link SumLambdaExampleDriver}). The @@ -63,7 +63,7 @@ * {@code * # Here: Write input data using the example driver. Once the driver has stopped generating data, * # you can terminate it via `Ctrl-C`. - * $ java -cp target/kafka-streams-examples-5.0.0-SNAPSHOT-standalone.jar io.confluent.examples.streams.SumLambdaExampleDriver + * $ java -cp target/kafka-streams-examples-5.1.4-standalone.jar io.confluent.examples.streams.SumLambdaExampleDriver * } * * 5) Inspect the resulting data in the output topics, e.g. via {@code kafka-console-consumer}. diff --git a/src/main/java/io/confluent/examples/streams/SumLambdaExampleDriver.java b/src/main/java/io/confluent/examples/streams/SumLambdaExampleDriver.java index 589ffeaae1..0732888121 100644 --- a/src/main/java/io/confluent/examples/streams/SumLambdaExampleDriver.java +++ b/src/main/java/io/confluent/examples/streams/SumLambdaExampleDriver.java @@ -41,7 +41,7 @@ * Once packaged you can then run: *
  * {@code
- * $ java -cp target/kafka-streams-examples-5.0.0-SNAPSHOT-standalone.jar io.confluent.examples.streams.SumLambdaExampleDriver
+ * $ java -cp target/kafka-streams-examples-5.1.4-standalone.jar io.confluent.examples.streams.SumLambdaExampleDriver
  * }
  * 
* You should terminate with {@code Ctrl-C}. diff --git a/src/main/java/io/confluent/examples/streams/TopArticlesExampleDriver.java b/src/main/java/io/confluent/examples/streams/TopArticlesExampleDriver.java index 01095457eb..9c2eb280fb 100644 --- a/src/main/java/io/confluent/examples/streams/TopArticlesExampleDriver.java +++ b/src/main/java/io/confluent/examples/streams/TopArticlesExampleDriver.java @@ -52,7 +52,7 @@ * Once packaged you can then run: *
  * {@code
- * java -cp target/kafka-streams-examples-5.0.0-SNAPSHOT-standalone.jar io.confluent.examples.streams.TopArticlesLambdaExample
+ * java -cp target/kafka-streams-examples-5.1.4-standalone.jar io.confluent.examples.streams.TopArticlesLambdaExample
  * }
  * 
* diff --git a/src/main/java/io/confluent/examples/streams/TopArticlesLambdaExample.java b/src/main/java/io/confluent/examples/streams/TopArticlesLambdaExample.java index 4dd8f6ccb4..c947250e79 100644 --- a/src/main/java/io/confluent/examples/streams/TopArticlesLambdaExample.java +++ b/src/main/java/io/confluent/examples/streams/TopArticlesLambdaExample.java @@ -81,7 +81,7 @@ * Once packaged you can then run: *
  * {@code
- * $ java -cp target/kafka-streams-examples-5.0.0-SNAPSHOT-standalone.jar io.confluent.examples.streams.TopArticlesLambdaExample
+ * $ java -cp target/kafka-streams-examples-5.1.4-standalone.jar io.confluent.examples.streams.TopArticlesLambdaExample
  * }
  * 
* 4) Write some input data to the source topics (e.g. via {@link TopArticlesExampleDriver}). @@ -92,7 +92,7 @@ * {@code * # Here: Write input data using the example driver. Once the driver has stopped generating data, * # you can terminate it via Ctrl-C. - * $ java -cp target/kafka-streams-examples-5.0.0-SNAPSHOT-standalone.jar io.confluent.examples.streams.TopArticlesExampleDriver + * $ java -cp target/kafka-streams-examples-5.1.4-standalone.jar io.confluent.examples.streams.TopArticlesExampleDriver * } * */ diff --git a/src/main/java/io/confluent/examples/streams/UserRegionLambdaExample.java b/src/main/java/io/confluent/examples/streams/UserRegionLambdaExample.java index c57c171615..77719eb394 100644 --- a/src/main/java/io/confluent/examples/streams/UserRegionLambdaExample.java +++ b/src/main/java/io/confluent/examples/streams/UserRegionLambdaExample.java @@ -54,7 +54,7 @@ * Once packaged you can then run: *
  * {@code
- * $ java -cp target/kafka-streams-examples-5.0.0-SNAPSHOT-standalone.jar io.confluent.examples.streams.UserRegionLambdaExample
+ * $ java -cp target/kafka-streams-examples-5.1.4-standalone.jar io.confluent.examples.streams.UserRegionLambdaExample
  * }
  * 
* 4) Write some input data to the source topics (e.g. via {@code kafka-console-producer}). The already diff --git a/src/main/java/io/confluent/examples/streams/WikipediaFeedAvroExample.java b/src/main/java/io/confluent/examples/streams/WikipediaFeedAvroExample.java index 1acb8ca9bc..ec845cba45 100644 --- a/src/main/java/io/confluent/examples/streams/WikipediaFeedAvroExample.java +++ b/src/main/java/io/confluent/examples/streams/WikipediaFeedAvroExample.java @@ -58,7 +58,7 @@ * Once packaged you can then run: *
  * {@code
- * $ java -cp target/kafka-streams-examples-5.0.0-SNAPSHOT-standalone.jar io.confluent.examples.streams.WikipediaFeedAvroExample
+ * $ java -cp target/kafka-streams-examples-5.1.4-standalone.jar io.confluent.examples.streams.WikipediaFeedAvroExample
  * }
  * 
* 4) Write some input data to the source topics (e.g. via {@link WikipediaFeedAvroExampleDriver}). @@ -69,7 +69,7 @@ * {@code * # Here: Write input data using the example driver. Once the driver has stopped generating data, * # you can terminate it via Ctrl-C. - * $ java -cp target/kafka-streams-examples-5.0.0-SNAPSHOT-standalone.jar io.confluent.examples.streams.WikipediaFeedAvroExampleDriver + * $ java -cp target/kafka-streams-examples-5.1.4-standalone.jar io.confluent.examples.streams.WikipediaFeedAvroExampleDriver * } * */ diff --git a/src/main/java/io/confluent/examples/streams/WikipediaFeedAvroExampleDriver.java b/src/main/java/io/confluent/examples/streams/WikipediaFeedAvroExampleDriver.java index 414513c8e5..044e612748 100644 --- a/src/main/java/io/confluent/examples/streams/WikipediaFeedAvroExampleDriver.java +++ b/src/main/java/io/confluent/examples/streams/WikipediaFeedAvroExampleDriver.java @@ -46,7 +46,7 @@ * Once packaged you can then run: *
  * {@code
- * java -cp target/kafka-streams-examples-5.0.0-SNAPSHOT-standalone.jar io.confluent.examples.streams.WikipediaFeedAvroExampleDriver
+ * java -cp target/kafka-streams-examples-5.1.4-standalone.jar io.confluent.examples.streams.WikipediaFeedAvroExampleDriver
  * }
  * 
* diff --git a/src/main/java/io/confluent/examples/streams/WikipediaFeedAvroLambdaExample.java b/src/main/java/io/confluent/examples/streams/WikipediaFeedAvroLambdaExample.java index 23f27a3348..5fc961af14 100644 --- a/src/main/java/io/confluent/examples/streams/WikipediaFeedAvroLambdaExample.java +++ b/src/main/java/io/confluent/examples/streams/WikipediaFeedAvroLambdaExample.java @@ -55,7 +55,7 @@ * Once packaged you can then run: *
  * {@code
- * $ java -cp target/kafka-streams-examples-5.0.0-SNAPSHOT-standalone.jar io.confluent.examples.streams.WikipediaFeedAvroLambdaExample
+ * $ java -cp target/kafka-streams-examples-5.1.4-standalone.jar io.confluent.examples.streams.WikipediaFeedAvroLambdaExample
  * }
  * 
* 4) Write some input data to the source topics (e.g. via {@link WikipediaFeedAvroExampleDriver}). @@ -66,7 +66,7 @@ * {@code * # Here: Write input data using the example driver. Once the driver has stopped generating data, * # you can terminate it via Ctrl-C. - * $ java -cp target/kafka-streams-examples-5.0.0-SNAPSHOT-standalone.jar io.confluent.examples.streams.WikipediaFeedAvroExampleDriver + * $ java -cp target/kafka-streams-examples-5.1.4-standalone.jar io.confluent.examples.streams.WikipediaFeedAvroExampleDriver * } * */ diff --git a/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java b/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java index dfc536cb26..32af7e1c6e 100644 --- a/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java +++ b/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java @@ -62,7 +62,7 @@ * Once packaged you can then run: *
  * {@code
- * $ java -cp target/kafka-streams-examples-5.0.0-SNAPSHOT-standalone.jar io.confluent.examples.streams.WordCountLambdaExample
+ * $ java -cp target/kafka-streams-examples-5.1.4-standalone.jar io.confluent.examples.streams.WordCountLambdaExample
  * }
  * 
* 4) Write some input data to the source topic "streams-plaintext-input" (e.g. via {@code kafka-console-producer}). diff --git a/src/main/java/io/confluent/examples/streams/interactivequeries/WordCountInteractiveQueriesDriver.java b/src/main/java/io/confluent/examples/streams/interactivequeries/WordCountInteractiveQueriesDriver.java index 12de211b1a..4a5befa761 100644 --- a/src/main/java/io/confluent/examples/streams/interactivequeries/WordCountInteractiveQueriesDriver.java +++ b/src/main/java/io/confluent/examples/streams/interactivequeries/WordCountInteractiveQueriesDriver.java @@ -36,7 +36,7 @@ * Once packaged you can then run: *
  * {@code
- * $ java -cp target/kafka-streams-examples-5.0.0-SNAPSHOT-standalone.jar io.confluent.examples.streams.interactivequeries.WordCountInteractiveQueriesDriver
+ * $ java -cp target/kafka-streams-examples-5.1.4-standalone.jar io.confluent.examples.streams.interactivequeries.WordCountInteractiveQueriesDriver
  * }
  * 
* You should terminate with Ctrl-C diff --git a/src/main/java/io/confluent/examples/streams/interactivequeries/WordCountInteractiveQueriesExample.java b/src/main/java/io/confluent/examples/streams/interactivequeries/WordCountInteractiveQueriesExample.java index 6b3130cde5..709f18fae8 100644 --- a/src/main/java/io/confluent/examples/streams/interactivequeries/WordCountInteractiveQueriesExample.java +++ b/src/main/java/io/confluent/examples/streams/interactivequeries/WordCountInteractiveQueriesExample.java @@ -83,7 +83,7 @@ * *
  * {@code
- * $ java -cp target/kafka-streams-examples-5.0.0-SNAPSHOT-standalone.jar \
+ * $ java -cp target/kafka-streams-examples-5.1.4-standalone.jar \
  *      io.confluent.examples.streams.interactivequeries.WordCountInteractiveQueriesExample 7070
  * }
  * 
@@ -94,7 +94,7 @@ * *
  * {@code
- * $ java -cp target/kafka-streams-examples-5.0.0-SNAPSHOT-standalone.jar \
+ * $ java -cp target/kafka-streams-examples-5.1.4-standalone.jar \
  *      io.confluent.examples.streams.interactivequeries.WordCountInteractiveQueriesExample 7071
  * }
  * 
diff --git a/src/main/java/io/confluent/examples/streams/interactivequeries/kafkamusic/KafkaMusicExample.java b/src/main/java/io/confluent/examples/streams/interactivequeries/kafkamusic/KafkaMusicExample.java index 7ccce83d2b..21db420623 100644 --- a/src/main/java/io/confluent/examples/streams/interactivequeries/kafkamusic/KafkaMusicExample.java +++ b/src/main/java/io/confluent/examples/streams/interactivequeries/kafkamusic/KafkaMusicExample.java @@ -110,7 +110,7 @@ * *
  * {@code
- * $ java -cp target/kafka-streams-examples-5.0.0-SNAPSHOT-standalone.jar \
+ * $ java -cp target/kafka-streams-examples-5.1.4-standalone.jar \
  *      io.confluent.examples.streams.interactivequeries.kafkamusic.KafkaMusicExample 7070
  * }
  * 
@@ -121,7 +121,7 @@ * *
  * {@code
- * $ java -cp target/kafka-streams-examples-5.0.0-SNAPSHOT-standalone.jar \
+ * $ java -cp target/kafka-streams-examples-5.1.4-standalone.jar \
  *      io.confluent.examples.streams.interactivequeries.kafkamusic.KafkaMusicExample 7071
  * }
  * 
diff --git a/src/main/java/io/confluent/examples/streams/interactivequeries/kafkamusic/KafkaMusicExampleDriver.java b/src/main/java/io/confluent/examples/streams/interactivequeries/kafkamusic/KafkaMusicExampleDriver.java index e270c44823..f68ad168f4 100644 --- a/src/main/java/io/confluent/examples/streams/interactivequeries/kafkamusic/KafkaMusicExampleDriver.java +++ b/src/main/java/io/confluent/examples/streams/interactivequeries/kafkamusic/KafkaMusicExampleDriver.java @@ -52,7 +52,7 @@ * Once packaged you can then run: *
  * {@code
- * $ java -cp target/kafka-streams-examples-5.0.0-SNAPSHOT-standalone.jar \
+ * $ java -cp target/kafka-streams-examples-5.1.4-standalone.jar \
  *      io.confluent.examples.streams.interactivequeries.kafkamusic.KafkaMusicExampleDriver
  * }
  * 
diff --git a/src/main/java/io/confluent/examples/streams/microservices/README.md b/src/main/java/io/confluent/examples/streams/microservices/README.md index f2aacec035..8e1d8d19f4 100644 --- a/src/main/java/io/confluent/examples/streams/microservices/README.md +++ b/src/main/java/io/confluent/examples/streams/microservices/README.md @@ -34,7 +34,7 @@ Then run the fully-working demo [end-to-end](https://docs.confluent.io/current/t It runs the ecosystem and all the microservices for you including Kafka Connect, Elasticsearch, KSQL and Control Center. To play with this ecosystem the simplest way is to run the tests and fiddle with the code (stand alone execution is only supported in branch 5.0.0+ so go there if you want stand alone or docker support). Each test boots a self-contained Kafka cluster so it's easy to play with different queries and configurations. -The best place to start is [EndToEndTest.java](https://github.com/confluentinc/kafka-streams-examples/blob/5.1.4-SNAPSHOT-post/src/test/java/io/confluent/examples/streams/microservices/EndToEndTest.java) +The best place to start is [EndToEndTest.java](https://github.com/confluentinc/kafka-streams-examples/blob/5.1.x/src/test/java/io/confluent/examples/streams/microservices/EndToEndTest.java) # Running the Examples: * Requires Java 1.8 diff --git a/src/main/scala/io/confluent/examples/streams/MapFunctionScalaExample.scala b/src/main/scala/io/confluent/examples/streams/MapFunctionScalaExample.scala index 860ef789ef..9936df4006 100644 --- a/src/main/scala/io/confluent/examples/streams/MapFunctionScalaExample.scala +++ b/src/main/scala/io/confluent/examples/streams/MapFunctionScalaExample.scala @@ -55,7 +55,7 @@ import org.apache.kafka.streams.{KafkaStreams, StreamsConfig} * Once packaged you can then run: * * {{{ - * $ java -cp target/kafka-streams-examples-5.0.0-SNAPSHOT-standalone.jar io.confluent.examples.streams.MapFunctionScalaExample + * $ java -cp target/kafka-streams-examples-5.1.4-standalone.jar io.confluent.examples.streams.MapFunctionScalaExample * }}} * * 4) Write some input data to the source topics (e.g. via `kafka-console-producer`. The already diff --git a/src/main/scala/io/confluent/examples/streams/WordCountScalaExample.scala b/src/main/scala/io/confluent/examples/streams/WordCountScalaExample.scala index 9802687f84..d7aee15857 100644 --- a/src/main/scala/io/confluent/examples/streams/WordCountScalaExample.scala +++ b/src/main/scala/io/confluent/examples/streams/WordCountScalaExample.scala @@ -43,7 +43,7 @@ import org.apache.kafka.streams.{KafkaStreams, StreamsConfig} * Once packaged you can then run: * * {{{ - * $ java -cp target/kafka-streams-examples-5.0.0-SNAPSHOT-standalone.jar io.confluent.examples.streams.WordCountLambdaExample + * $ java -cp target/kafka-streams-examples-5.1.4-standalone.jar io.confluent.examples.streams.WordCountLambdaExample * }}} * * 4) Write some input data to the source topic "streams-plaintext-input" (e.g. via `kafka-console-producer`).