Skip to content

Commit

Permalink
SAMZA-1170: Integration testing harness for StreamApplications
Browse files Browse the repository at this point in the history
- Added an integration testing harness for testing `StreamApplication`s. This is convenient for running and interacting with Kafka brokers, Zk servers and Samza components locally.
- Added the following integration tests that use this harness to test actual `StreamApplication`s:
1. Keyed Tumbling Window
2. Keyed Session Window
3. Repartition with Keyed Session Window

Bonus: A couple of additional bug-fixes that were blockers for this test.

Author: vjagadish1989 <jvenkatr@linkedin.com>

Reviewers: Prateek Maheshwari<prateekm@linkedin.com>, Jacob Maes<jmaes@linkedin.com>

Closes apache#96 from vjagadish1989/integration-tests
  • Loading branch information
jagadish-northguard committed Apr 12, 2017
1 parent 260414d commit 8a18a70
Show file tree
Hide file tree
Showing 8 changed files with 545 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,10 @@ public static StreamApplication createStreamApplication(Config config) {
Class<?> builderClass = Class.forName(appClassName);
return (StreamApplication) builderClass.newInstance();
} catch (Throwable t) {
throw new ConfigException(String.format("%s is not a StreamApplication.", appClassName));
String errorMsg = String.format("Failed to create StreamApplication class from the config. %s = %s",
StreamApplication.APP_CLASS_CONFIG, config.get(StreamApplication.APP_CLASS_CONFIG));
log.error(errorMsg, t);
throw new ConfigException(errorMsg, t);
}
} else {
return null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.test.operator;

class PageView {
private final String userId;
private final String country;
private final String url;

/**
* Constructs a {@link PageView} from the provided string.
*
* @param message in the following CSV format - userId,country,url
*/
PageView(String message) {
String[] pageViewFields = message.split(",");
userId = pageViewFields[0];
country = pageViewFields[1];
url = pageViewFields[2];
}

String getUserId() {
return userId;
}

String getCountry() {
return country;
}

String getUrl() {
return url;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.samza.test.operator;

import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.Config;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
import org.apache.samza.operators.StreamGraph;
import org.apache.samza.operators.windows.WindowPane;
import org.apache.samza.operators.windows.Windows;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Collection;
import java.util.function.Function;

/**
* A {@link StreamApplication} that demonstrates a repartition followed by a windowed count.
*/
public class RepartitionWindowApp implements StreamApplication {

private static final Logger LOG = LoggerFactory.getLogger(RepartitionWindowApp.class);

@Override
public void init(StreamGraph graph, Config config) {

MessageStream<String> pageViews = graph.<String, String, String>getInputStream("page-views", (k, v) -> v);
Function<String, String> keyFn = pageView -> new PageView(pageView).getUserId();

OutputStream<String, String, WindowPane<String, Collection<String>>> outputStream = graph
.getOutputStream(TestRepartitionWindowApp.OUTPUT_TOPIC, m -> m.getKey().getKey(), m -> new Integer(m.getMessage().size()).toString());

pageViews
.partitionBy(keyFn)
.window(Windows.keyedSessionWindow(keyFn, Duration.ofSeconds(3)))
.sendTo(outputStream);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.samza.test.operator;

import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.Config;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
import org.apache.samza.operators.StreamGraph;
import org.apache.samza.operators.windows.WindowPane;
import org.apache.samza.operators.windows.Windows;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Collection;

/**
* A {@link StreamApplication} that demonstrates a filter followed by a session window.
*/
public class SessionWindowApp implements StreamApplication {

private static final Logger LOG = LoggerFactory.getLogger(SessionWindowApp.class);
private static final String FILTER_KEY = "badKey";
private static final String OUTPUT_TOPIC = "Result";

@Override
public void init(StreamGraph graph, Config config) {
MessageStream<PageView> pageViews = graph.<String, String, PageView>getInputStream("page-views", (k, v) -> new PageView(v));
OutputStream<String, String, WindowPane<String, Collection<PageView>>> outputStream = graph
.getOutputStream(OUTPUT_TOPIC, m -> m.getKey().getKey(), m -> new Integer(m.getMessage().size()).toString());

pageViews
.filter(m -> !FILTER_KEY.equals(m.getUserId()))
.window(Windows.keyedSessionWindow(pageView -> pageView.getUserId(), Duration.ofSeconds(3)))
.sendTo(outputStream);
}
}
Loading

0 comments on commit 8a18a70

Please sign in to comment.