Skip to content

Commit

Permalink
Rewrite demo applications with the Pipeline API (hazelcast#53)
Browse files Browse the repository at this point in the history
  • Loading branch information
Emin Demirci authored Feb 26, 2018
1 parent 452509d commit 1f4c824
Show file tree
Hide file tree
Showing 13 changed files with 334 additions and 520 deletions.
136 changes: 41 additions & 95 deletions cryptocurrency-realtime-trend/src/main/java/JetCoinTrend.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,43 +2,28 @@
import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.aggregate.AggregateOperation1;
import com.hazelcast.jet.aggregate.AggregateOperations;
import com.hazelcast.jet.core.AppendableTraverser;
import com.hazelcast.jet.core.DAG;
import com.hazelcast.jet.core.TimestampKind;
import com.hazelcast.jet.core.Vertex;
import com.hazelcast.jet.core.WatermarkGenerationParams;
import com.hazelcast.jet.core.processor.DiagnosticProcessors;
import com.hazelcast.jet.core.processor.Processors;
import com.hazelcast.jet.datamodel.TimestampedEntry;
import com.hazelcast.jet.datamodel.Tuple2;
import com.hazelcast.jet.function.DistributedFunction;
import com.hazelcast.jet.function.DistributedToLongFunction;
import com.hazelcast.jet.pipeline.SlidingWindowDef;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.StreamStage;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.locks.LockSupport;

import static com.hazelcast.jet.Util.entry;
import static com.hazelcast.jet.aggregate.AggregateOperations.allOf;
import static com.hazelcast.jet.aggregate.AggregateOperations.averagingDouble;
import static com.hazelcast.jet.aggregate.AggregateOperations.counting;
import static com.hazelcast.jet.core.Edge.between;
import static com.hazelcast.jet.core.Edge.from;
import static com.hazelcast.jet.core.ProcessorSupplier.of;
import static com.hazelcast.jet.core.WatermarkEmissionPolicy.emitByFrame;
import static com.hazelcast.jet.core.WatermarkGenerationParams.wmGenParams;
import static com.hazelcast.jet.core.WatermarkPolicies.limitingLag;
import static com.hazelcast.jet.core.processor.Processors.aggregateToSlidingWindowP;
import static com.hazelcast.jet.core.processor.Processors.insertWatermarksP;
import static com.hazelcast.jet.core.processor.SinkProcessors.writeMapP;
import static com.hazelcast.jet.function.DistributedFunctions.entryKey;
import static com.hazelcast.jet.pipeline.Sinks.map;
import static com.hazelcast.jet.pipeline.WindowDefinition.sliding;
import static com.hazelcast.util.ExceptionUtil.rethrow;
import static java.util.Collections.singletonList;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

public class JetCoinTrend {
Expand All @@ -58,101 +43,62 @@ public class JetCoinTrend {
System.setProperty("hazelcast.logging.type", "log4j");
}

public static void main(String[] args) throws Exception {
public static void main(String[] args) {
System.out.println("DISCLAIMER: This is not an investment advice");

DAG dag = buildDag();
Pipeline pipeline = buildPipeline();
// Start Jet
JetInstance jet = Jet.newJetInstance();
startConsolePrinterThread(jet);
try {
// Perform the computation
jet.newJob(dag).join();
jet.newJob(pipeline).join();
} finally {
running = false;
Jet.shutdownAll();
}
}

private static DAG buildDag() {
DAG dag = new DAG();

private static Pipeline buildPipeline() {
Pipeline pipeline = Pipeline.create();
Properties properties = loadProperties();
List<String> terms = loadTerms();
Vertex twitterSource = dag.newVertex("twitter", StreamTwitterP.streamTwitterP(properties, terms));
Vertex relevance = dag.newVertex("relevance", Processors.<TimestampedEntry<Object, String>,
TimestampedEntry<String, String>>flatMapP(JetCoinTrend::flatMapToRelevant));
Vertex sentiment = dag.newVertex("sentiment", of(SentimentProcessor::new));

SlidingWindowDef slidingWindowOf30Sec = sliding(30_000, 10_000);
SlidingWindowDef slidingWindowOf1Min = sliding(60_000, 10_000);
SlidingWindowDef slidingWindowOf5Min = sliding(300_000, 10_000);

WatermarkGenerationParams<TimestampedEntry<String, Double>> params = wmGenParams(
TimestampedEntry::getTimestamp,
limitingLag(5000),
emitByFrame(slidingWindowOf30Sec.toSlidingWindowPolicy()), 60000
);
Vertex insertWm = dag.newVertex("insertWm", insertWatermarksP(params)).localParallelism(1);

AggregateOperation1<TimestampedEntry<String, Double>, ?, Tuple2<Double, Long>> aggrOp =
AggregateOperations.allOf(averagingDouble(TimestampedEntry::getValue), counting());


DistributedFunction<TimestampedEntry, Object> getKeyFn = TimestampedEntry::getKey;
DistributedToLongFunction<TimestampedEntry> getTimeStampFn = TimestampedEntry::getTimestamp;
Vertex slidingWin30sec = dag.newVertex("slidingWin30Sec", aggregateToSlidingWindowP(
singletonList(getKeyFn),
singletonList(getTimeStampFn),
TimestampKind.EVENT,
slidingWindowOf30Sec.toSlidingWindowPolicy(),
aggrOp,
(ignored, timestamp, key, value) -> new TimestampedEntry<>(ignored, timestamp, key, value)
));

Vertex slidingWin1min = dag.newVertex("slidingWin1Min", aggregateToSlidingWindowP(
singletonList(getKeyFn),
singletonList(getTimeStampFn),
TimestampKind.EVENT,
slidingWindowOf1Min.toSlidingWindowPolicy(),
aggrOp,
(ignored, timestamp, key, value) -> new TimestampedEntry<>(ignored, timestamp, key, value)
));

Vertex slidingWin5min = dag.newVertex("slidingWin5Min", aggregateToSlidingWindowP(
singletonList(getKeyFn),
singletonList(getTimeStampFn),
TimestampKind.EVENT,
slidingWindowOf5Min.toSlidingWindowPolicy(),
aggrOp,
(ignored, timestamp, key, value) -> new TimestampedEntry<>(ignored, timestamp, key, value)
));

Vertex map30Seconds = dag.newVertex(MAP_NAME_30_SECONDS, writeMapP(MAP_NAME_30_SECONDS));
Vertex map1Min = dag.newVertex(MAP_NAME_1_MINUTE, writeMapP(MAP_NAME_1_MINUTE));
Vertex map5Min = dag.newVertex(MAP_NAME_5_MINUTE, writeMapP(MAP_NAME_5_MINUTE));

Vertex loggerSink = dag.newVertex("logger", DiagnosticProcessors.writeLoggerP());

return dag.edge(between(twitterSource, insertWm))
.edge(between(insertWm, relevance))
.edge(between(relevance, sentiment))
.edge(from(sentiment, 0).to(slidingWin30sec).partitioned(entryKey()).distributed())
.edge(from(sentiment, 1).to(slidingWin1min).partitioned(entryKey()).distributed())
.edge(from(sentiment, 2).to(slidingWin5min).partitioned(entryKey()).distributed())
.edge(between(slidingWin30sec, map30Seconds))
.edge(between(slidingWin1min, map1Min))
.edge(between(slidingWin5min, map5Min));

StreamStage<Entry<String, Double>> tweetsWithSentiment = pipeline
.drawFrom(StreamTwitterP.streamTwitter(properties, terms))
.addTimestamps()
.flatMap(JetCoinTrend::flatMapToRelevant)
.customTransform("sentiment", SentimentProcessor::new);

AggregateOperation1<Entry<String, Double>, ?, Tuple2<Double, Long>> aggrOp =
allOf(averagingDouble(Entry::getValue), counting());

tweetsWithSentiment.window(sliding(30_000, 10_000))
.groupingKey(entryKey())
.aggregate(aggrOp)
.drainTo(map(MAP_NAME_30_SECONDS));

tweetsWithSentiment.window(sliding(60_000, 10_000))
.groupingKey(entryKey())
.aggregate(aggrOp)
.drainTo(map(MAP_NAME_1_MINUTE));

tweetsWithSentiment.window(sliding(300_000, 10_000))
.groupingKey(entryKey())
.aggregate(aggrOp)
.drainTo(map(MAP_NAME_5_MINUTE));

return pipeline;
}


// returns a traverser which flat maps each tweet to (coin, tweet) pairs by finding coins relevant to this tweet
private static Traverser<? extends TimestampedEntry<String, String>> flatMapToRelevant(TimestampedEntry<Object, String> e) {
AppendableTraverser<TimestampedEntry<String, String>> traverser = new AppendableTraverser<>(4);
String text = e.getValue();
private static Traverser<? extends Entry<String, String>> flatMapToRelevant(String text) {
AppendableTraverser<Entry<String, String>> traverser = new AppendableTraverser<>(4);
for (String coin : CoinDefs.COIN_MAP.keySet()) {
for (String keyword : CoinDefs.COIN_MAP.get(coin)) {
if (text.contains(keyword)) {
traverser.append(new TimestampedEntry<>(e.getTimestamp(), coin, e.getValue()));
traverser.append(entry(coin, text));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class SentimentProcessor extends AbstractProcessor {
});

@Override
protected boolean tryProcess(int ordinal, @Nonnull Object item) throws Exception {
protected boolean tryProcess(int ordinal, @Nonnull Object item) {
return mapper.tryProcess(((TimestampedEntry<String, String>) item));
}
}
24 changes: 12 additions & 12 deletions cryptocurrency-realtime-trend/src/main/java/StreamTwitterP.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,39 +2,35 @@
import com.hazelcast.jet.core.AbstractProcessor;
import com.hazelcast.jet.core.CloseableProcessorSupplier;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.datamodel.TimestampedEntry;
import com.hazelcast.jet.pipeline.Sources;
import com.hazelcast.jet.pipeline.StreamSource;
import com.twitter.hbc.ClientBuilder;
import com.twitter.hbc.core.Constants;
import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint;
import com.twitter.hbc.core.processor.StringDelimitedProcessor;
import com.twitter.hbc.httpclient.BasicClient;
import com.twitter.hbc.httpclient.auth.Authentication;
import com.twitter.hbc.httpclient.auth.OAuth1;
import org.json.JSONObject;

import javax.annotation.Nonnull;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import javax.annotation.Nonnull;
import org.json.JSONObject;

import static com.hazelcast.jet.Traversers.traverseIterable;
import static com.hazelcast.jet.core.ProcessorMetaSupplier.dontParallelize;
import static java.lang.System.currentTimeMillis;

public class StreamTwitterP extends AbstractProcessor implements Closeable {

private final static Object EMPTY = new Object();

private final Properties properties;
private final List<String> terms;
private final BlockingQueue<String> queue = new LinkedBlockingQueue<>(10000);
private final ArrayList<String> buffer = new ArrayList<>();

private Traverser<TimestampedEntry> traverser;
private Traverser<String> traverser;
private BasicClient client;

private StreamTwitterP(Properties properties, List<String> terms) {
Expand All @@ -43,7 +39,7 @@ private StreamTwitterP(Properties properties, List<String> terms) {
}

@Override
protected void init(@Nonnull Context context) throws Exception {
protected void init(@Nonnull Context context) {
StatusesFilterEndpoint endpoint = new StatusesFilterEndpoint();
endpoint.trackTerms(terms);

Expand Down Expand Up @@ -74,7 +70,7 @@ public boolean complete() {
traverser = traverseIterable(buffer)
.map(JSONObject::new)
.filter(json -> json.has("text"))
.map(json -> new TimestampedEntry<>(currentTimeMillis(), EMPTY, json.getString("text")));
.map(json -> json.getString("text"));
}
}
if (emitFromTraverser(traverser)) {
Expand All @@ -94,7 +90,7 @@ public boolean isCooperative() {
}

@Override
public void close() throws IOException {
public void close() {
if (client != null) {
client.stop();
}
Expand All @@ -104,4 +100,8 @@ public static ProcessorMetaSupplier streamTwitterP(Properties properties, List<S
return dontParallelize(new CloseableProcessorSupplier<>(() -> new StreamTwitterP(properties, terms)));
}

public static StreamSource<String> streamTwitter(Properties properties, List<String> terms) {
return Sources.streamFromProcessor("twitterSource", streamTwitterP(properties, terms));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public static ProcessorMetaSupplier streamAircraftP(String url, long intervalMil
}

public static StreamSource<Aircraft> streamAircraft(String url, long intervalMillis) {
return streamFromProcessor("streamAircraft", streamAircraftP(url, intervalMillis));
return streamFromProcessor("streamAircraft", streamAircraftP(url, intervalMillis));
}

}
Loading

0 comments on commit 1f4c824

Please sign in to comment.