diff --git a/samza-core/src/main/java/org/apache/samza/task/TaskFactoryUtil.java b/samza-core/src/main/java/org/apache/samza/task/TaskFactoryUtil.java index f8d1c87838ff7..445d13e8480f0 100644 --- a/samza-core/src/main/java/org/apache/samza/task/TaskFactoryUtil.java +++ b/samza-core/src/main/java/org/apache/samza/task/TaskFactoryUtil.java @@ -169,7 +169,10 @@ public static StreamApplication createStreamApplication(Config config) { Class builderClass = Class.forName(appClassName); return (StreamApplication) builderClass.newInstance(); } catch (Throwable t) { - throw new ConfigException(String.format("%s is not a StreamApplication.", appClassName)); + String errorMsg = String.format("Failed to create StreamApplication class from the config. %s = %s", + StreamApplication.APP_CLASS_CONFIG, config.get(StreamApplication.APP_CLASS_CONFIG)); + log.error(errorMsg, t); + throw new ConfigException(errorMsg, t); } } else { return null; diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/PageView.java b/samza-test/src/test/java/org/apache/samza/test/operator/PageView.java new file mode 100644 index 0000000000000..40e5e30a253d2 --- /dev/null +++ b/samza-test/src/test/java/org/apache/samza/test/operator/PageView.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.test.operator; + +class PageView { + private final String userId; + private final String country; + private final String url; + + /** + * Constructs a {@link PageView} from the provided string. + * + * @param message in the following CSV format - userId,country,url + */ + PageView(String message) { + String[] pageViewFields = message.split(","); + userId = pageViewFields[0]; + country = pageViewFields[1]; + url = pageViewFields[2]; + } + + String getUserId() { + return userId; + } + + String getCountry() { + return country; + } + + String getUrl() { + return url; + } +} diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java new file mode 100644 index 0000000000000..1e2acb258d8e3 --- /dev/null +++ b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.test.operator; + +import org.apache.samza.application.StreamApplication; +import org.apache.samza.config.Config; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.OutputStream; +import org.apache.samza.operators.StreamGraph; +import org.apache.samza.operators.windows.WindowPane; +import org.apache.samza.operators.windows.Windows; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Collection; +import java.util.function.Function; + +/** + * A {@link StreamApplication} that demonstrates a repartition followed by a windowed count. + */ +public class RepartitionWindowApp implements StreamApplication { + + private static final Logger LOG = LoggerFactory.getLogger(RepartitionWindowApp.class); + + @Override + public void init(StreamGraph graph, Config config) { + + MessageStream pageViews = graph.getInputStream("page-views", (k, v) -> v); + Function keyFn = pageView -> new PageView(pageView).getUserId(); + + OutputStream>> outputStream = graph + .getOutputStream(TestRepartitionWindowApp.OUTPUT_TOPIC, m -> m.getKey().getKey(), m -> new Integer(m.getMessage().size()).toString()); + + pageViews + .partitionBy(keyFn) + .window(Windows.keyedSessionWindow(keyFn, Duration.ofSeconds(3))) + .sendTo(outputStream); + } +} diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java new file mode 100644 index 0000000000000..65b48d390e595 --- /dev/null +++ b/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.test.operator; + +import org.apache.samza.application.StreamApplication; +import org.apache.samza.config.Config; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.OutputStream; +import org.apache.samza.operators.StreamGraph; +import org.apache.samza.operators.windows.WindowPane; +import org.apache.samza.operators.windows.Windows; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Collection; + +/** + * A {@link StreamApplication} that demonstrates a filter followed by a session window. + */ +public class SessionWindowApp implements StreamApplication { + + private static final Logger LOG = LoggerFactory.getLogger(SessionWindowApp.class); + private static final String FILTER_KEY = "badKey"; + private static final String OUTPUT_TOPIC = "Result"; + + @Override + public void init(StreamGraph graph, Config config) { + MessageStream pageViews = graph.getInputStream("page-views", (k, v) -> new PageView(v)); + OutputStream>> outputStream = graph + .getOutputStream(OUTPUT_TOPIC, m -> m.getKey().getKey(), m -> new Integer(m.getMessage().size()).toString()); + + pageViews + .filter(m -> !FILTER_KEY.equals(m.getUserId())) + .window(Windows.keyedSessionWindow(pageView -> pageView.getUserId(), Duration.ofSeconds(3))) + .sendTo(outputStream); + } +} diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/StreamApplicationIntegrationTestHarness.java b/samza-test/src/test/java/org/apache/samza/test/operator/StreamApplicationIntegrationTestHarness.java new file mode 100644 index 0000000000000..9bb66ad17f631 --- /dev/null +++ b/samza-test/src/test/java/org/apache/samza/test/operator/StreamApplicationIntegrationTestHarness.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.test.operator; + +import kafka.utils.TestUtils; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.protocol.SecurityProtocol; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.samza.application.StreamApplication; +import org.apache.samza.config.Config; +import org.apache.samza.config.KafkaConfig; +import org.apache.samza.config.MapConfig; +import org.apache.samza.runtime.ApplicationRunner; +import org.apache.samza.test.harness.AbstractIntegrationTestHarness; +import scala.Option; +import scala.Option$; + +import java.io.File; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +/** + * Harness for writing integration tests for {@link StreamApplication}s. + * + *

This provides the following features for its sub-classes: + *

    + *
  • + * Automatic Setup and teardown: Any non-trivial integration test brings up components like Zookeeper + * servers, Kafka brokers, Kafka producers and Kafka consumers. This harness initializes each + * of these components in {@link #setUp()} and shuts down each of them cleanly in {@link #tearDown()}. + * {@link #setUp()} and {@link #tearDown()} are automatically invoked from the Junit runner. + *
  • + * Interaction with Kafka: The harness provides convenience methods to interact with Kafka brokers, consumers + * and producers - for instance, methods to create topics, produce and consume messages. + *
  • + * Config defaults: Often Samza integration tests have to setup config boiler plate + * to perform even simple tasks like producing and consuming messages. This harness provides default string + * serdes for producing / consuming messages and a default system-alias named "kafka" that uses the + * {@link org.apache.samza.system.kafka.KafkaSystemFactory}. + *
  • + * Debugging: At times, it is convenient to debug integration tests locally from an IDE. This harness + * runs all its components (including Kafka brokers, Zookeeper servers and Samza) locally. + *
+ * + *

Implementation Notes:
+ * State persistence: {@link #tearDown()} clears all associated state (including topics and metadata) in Kafka and + * Zookeeper. Hence, the state is not durable across invocations of {@link #tearDown()}
+ * + * Execution model: {@link StreamApplication}s are run as their own {@link org.apache.samza.job.local.ThreadJob}s. + * Similarly, embedded Kafka servers and Zookeeper servers are run as their own threads. + * {@link #produceMessage(String, int, String, String)} and {@link #consumeMessages(Collection, int)} are blocking calls. + * + *

Usage Example

+ * Here is an actual test that publishes a message into Kafka, runs an application, and verifies consumption + * from the output topic. + * + *
 {@code
+ * class MyTest extends StreamApplicationIntegrationTestHarness {
+ *   private final StreamApplication myApp = new MyStreamApplication();
+ *   private final Collection outputTopics = Collections.singletonList("output-topic");
+ *   @Test
+ *   public void test() {
+ *     createTopic("mytopic", 1);
+ *     produceMessage("mytopic", 0, "key1", "val1");
+ *     runApplication(myApp, "myApp", null);
+ *     List> messages = consumeMessages(outputTopics)
+ *     Assert.assertEquals(messages.size(), 1);
+ *   }
+ * }}
+ */ +public class StreamApplicationIntegrationTestHarness extends AbstractIntegrationTestHarness { + private KafkaProducer producer; + private KafkaConsumer consumer; + private StreamApplication app; + private ApplicationRunner runner; + + private int numEmptyPolls = 3; + private static final Duration POLL_TIMEOUT_MS = Duration.ofSeconds(20); + private static final String DEFAULT_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer"; + + /** + * Starts a single kafka broker, and a single embedded zookeeper server in their own threads. + * Sub-classes should invoke {@link #zkConnect()} and {@link #bootstrapUrl()}s to + * obtain the urls (and ports) of the started zookeeper and kafka broker. + */ + @Override + public void setUp() { + super.setUp(); + + Properties consumerDeserializerProperties = new Properties(); + consumerDeserializerProperties.setProperty("key.deserializer", DEFAULT_DESERIALIZER); + consumerDeserializerProperties.setProperty("value.deserializer", DEFAULT_DESERIALIZER); + + producer = TestUtils.createNewProducer( + bootstrapServers(), // bootstrap-server url + 1, // acks + 60 * 1000L, // maxBlockMs + 1024L * 1024L, // buffer size + 0, // numRetries + 0L, // lingerMs + 5 * 1000L, // requestTimeout + SecurityProtocol.PLAINTEXT, + null, + Option.apply(new Properties()), + new StringSerializer(), + new StringSerializer(), + Option.apply(new Properties())); + + consumer = TestUtils.createNewConsumer( + bootstrapServers(), + "group", // groupId + "earliest", // auto-offset-reset + 4096L, // per-partition fetch size + "org.apache.kafka.clients.consumer.RangeAssignor", // partition Assigner + 30000, + SecurityProtocol.PLAINTEXT, + Option$.MODULE$.empty(), + Option$.MODULE$.empty(), + Option$.MODULE$.apply(consumerDeserializerProperties)); + } + + /** + * Creates a kafka topic with the provided name and the number of partitions + * @param topicName the name of the topic + * @param numPartitions the number of partitions in the topic + */ + public void createTopic(String topicName, int numPartitions) { + TestUtils.createTopic(zkUtils(), topicName, numPartitions, 1, servers(), new Properties()); + } + + /** + * Produces a message to the provided topic partition. + * @param topicName the topic to produce messages to + * @param partitionId the topic partition to produce messages to + * @param key the key in the message + * @param val the value in the message + */ + public void produceMessage(String topicName, int partitionId, String key, String val) { + producer.send(new ProducerRecord(topicName, partitionId, key, val)); + producer.flush(); + } + + @Override + public int clusterSize() { + return Integer.parseInt(KafkaConfig.TOPIC_DEFAULT_REPLICATION_FACTOR()); + } + + + /** + * Read messages from the provided list of topics until {@param threshold} messages have been read or until + * {@link #numEmptyPolls} polls return no messages. + * + * The default poll time out is determined by {@link #POLL_TIMEOUT_MS} and the number of empty polls are + * determined by {@link #numEmptyPolls} + * + * @param topics the list of topics to consume from + * @param threshold the number of messages to consume + * @return the list of {@link ConsumerRecord}s whose size can be atmost {@param threshold} + */ + public List> consumeMessages(Collection topics, int threshold) { + int emptyPollCount = 0; + List> recordList = new ArrayList<>(); + consumer.subscribe(topics); + + while (emptyPollCount < numEmptyPolls && recordList.size() < threshold) { + ConsumerRecords records = consumer.poll(POLL_TIMEOUT_MS.toMillis()); + if (!records.isEmpty()) { + Iterator> iterator = records.iterator(); + while (iterator.hasNext() && recordList.size() < threshold) { + ConsumerRecord record = iterator.next(); + recordList.add(record); + emptyPollCount = 0; + } + } else { + emptyPollCount++; + } + } + return recordList; + } + + /** + * Executes the provided {@link StreamApplication} as a {@link org.apache.samza.job.local.ThreadJob}. The + * {@link StreamApplication} runs in its own separate thread. + * + * @param streamApplication the application to run + * @param appName the name of the application + * @param overriddenConfigs configs to override + */ + public void runApplication(StreamApplication streamApplication, String appName, Config overriddenConfigs) { + + Map configs = new HashMap<>(); + configs.put("job.factory.class", "org.apache.samza.job.local.ThreadJobFactory"); + configs.put("job.name", appName); + configs.put("app.class", streamApplication.getClass().getCanonicalName()); + configs.put("serializers.registry.json.class", "org.apache.samza.serializers.JsonSerdeFactory"); + configs.put("serializers.registry.string.class", "org.apache.samza.serializers.StringSerdeFactory"); + configs.put("systems.kafka.samza.factory", "org.apache.samza.system.kafka.KafkaSystemFactory"); + configs.put("systems.kafka.consumer.zookeeper.connect", zkConnect()); + configs.put("systems.kafka.producer.bootstrap.servers", bootstrapUrl()); + configs.put("systems.kafka.samza.key.serde", "string"); + configs.put("systems.kafka.samza.msg.serde", "string"); + configs.put("systems.kafka.samza.offset.default", "oldest"); + configs.put("job.coordinator.system", "kafka"); + configs.put("job.default.system", "kafka"); + configs.put("job.coordinator.replication.factor", "1"); + configs.put("task.window.ms", "1000"); + + if (overriddenConfigs != null) { + configs.putAll(overriddenConfigs); + } + + app = streamApplication; + runner = ApplicationRunner.fromConfig(new MapConfig(configs)); + runner.run(streamApplication); + } + + public void setNumEmptyPolls(int numEmptyPolls) { + this.numEmptyPolls = numEmptyPolls; + } + + /** + * Shutdown and clear Zookeeper and Kafka broker state. + */ + @Override + public void tearDown() { + super.tearDown(); + } +} diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java new file mode 100644 index 0000000000000..57522eb886ca3 --- /dev/null +++ b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.test.operator; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Collections; +import java.util.List; + +/** + * Test driver for {@link RepartitionWindowApp}. + */ +public class TestRepartitionWindowApp extends StreamApplicationIntegrationTestHarness { + + static final String INPUT_TOPIC = "page-views"; + static final String OUTPUT_TOPIC = "Result"; + private static final String APP_NAME = "RepartitionedSessionizer"; + + @Test + public void testRepartitionedSessionWindowCounter() throws Exception { + // create topics + createTopic(INPUT_TOPIC, 3); + createTopic(OUTPUT_TOPIC, 1); + + // produce messages to different partitions. + produceMessage(INPUT_TOPIC, 0, "userId1", "userId1,india,5.com"); + produceMessage(INPUT_TOPIC, 1, "userId2", "userId2,china,4.com"); + produceMessage(INPUT_TOPIC, 2, "userId1", "userId1,india,1.com"); + produceMessage(INPUT_TOPIC, 0, "userId1", "userId1,india,2.com"); + produceMessage(INPUT_TOPIC, 1, "userId1", "userId1,india,3.com"); + + // run the application + RepartitionWindowApp app = new RepartitionWindowApp(); + runApplication(app, APP_NAME, null); + + // consume and validate result + List> messages = consumeMessages(Collections.singletonList(OUTPUT_TOPIC), 2); + Assert.assertEquals(messages.size(), 2); + + for (ConsumerRecord message : messages) { + String key = message.key(); + String value = message.value(); + // Assert that there are 4 messages for userId1 and 1 message for userId2. + Assert.assertTrue(key.equals("userId1") || key.equals("userId2")); + if ("userId1".equals(key)) { + Assert.assertEquals(value, "4"); + } else { + Assert.assertEquals(value, "1"); + } + } + } +} diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java new file mode 100644 index 0000000000000..ae2608dab1858 --- /dev/null +++ b/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.test.operator; + +import org.apache.samza.application.StreamApplication; +import org.apache.samza.config.Config; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.OutputStream; +import org.apache.samza.operators.StreamGraph; +import org.apache.samza.operators.windows.WindowPane; +import org.apache.samza.operators.windows.Windows; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Collection; + +/** + * A {@link StreamApplication} that demonstrates a filter followed by a tumbling window. + */ +public class TumblingWindowApp implements StreamApplication { + + private static final Logger LOG = LoggerFactory.getLogger(TumblingWindowApp.class); + private static final String FILTER_KEY = "badKey"; + private static final String OUTPUT_TOPIC = "Result"; + + @Override + public void init(StreamGraph graph, Config config) { + MessageStream pageViews = graph.getInputStream("page-views", (k, v) -> new PageView(v)); + OutputStream>> outputStream = graph + .getOutputStream(OUTPUT_TOPIC, m -> m.getKey().getKey(), m -> new Integer(m.getMessage().size()).toString()); + + pageViews + .filter(m -> !FILTER_KEY.equals(m.getUserId())) + .window(Windows.keyedTumblingWindow(pageView -> pageView.getUserId(), Duration.ofSeconds(3))) + .sendTo(outputStream); + } +} diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java index 070e7a79c3262..861af16cc437e 100644 --- a/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java +++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java @@ -55,6 +55,7 @@ public class TestStreamProcessor extends StandaloneIntegrationTestHarness { * Testing a basic identity stream task - reads data from a topic and writes it to another topic * (without any modifications) * + *

* The standalone version in this test uses KafkaSystemFactory and it uses a SingleContainerGrouperFactory. Hence, * no matter how many tasks are present, it will always be run in a single processor instance. This simplifies testing */