Skip to content

Commit

Permalink
Minor improvements to Cryptocurrency Demo App (hazelcast#67)
Browse files Browse the repository at this point in the history
* Minor improvements:
1. Print full name of each currency rather than the symbol, which most people don't know
2. Remove SLF4J warning by adding the noop mvn dep
3. Remove the @OverRide annotation from close which does not override any method in the superclass

* Added Override annotations

* Changed slf4j-nop to slf4j-log4j12 dependency

* Shared SentimentAnalyzer instance locally
  • Loading branch information
Emin Demirci authored Apr 9, 2018
1 parent e9dda19 commit 8a16212
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 35 deletions.
5 changes: 5 additions & 0 deletions cryptocurrency-realtime-trend/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>edu.stanford.nlp</groupId>
<artifactId>stanford-corenlp</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,15 @@
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.StreamStageWithGrouping;
import edu.stanford.nlp.util.CoreMap;

import javax.annotation.Nullable;
import java.util.List;
import java.util.Map.Entry;
import java.util.Properties;
import javax.annotation.Nullable;

import static com.hazelcast.jet.Util.entry;
import static com.hazelcast.jet.aggregate.AggregateOperations.allOf;
import static com.hazelcast.jet.aggregate.AggregateOperations.averagingDouble;
import static com.hazelcast.jet.aggregate.AggregateOperations.counting;
import static com.hazelcast.jet.demo.util.Util.MAP_NAME_1_MINUTE;
import static com.hazelcast.jet.demo.util.Util.MAP_NAME_30_SECONDS;
import static com.hazelcast.jet.demo.util.Util.MAP_NAME_5_MINUTE;
import static com.hazelcast.jet.demo.util.Util.loadProperties;
import static com.hazelcast.jet.demo.util.Util.loadTerms;
import static com.hazelcast.jet.demo.util.Util.startConsolePrinterThread;
import static com.hazelcast.jet.aggregate.AggregateOperations.*;
import static com.hazelcast.jet.demo.util.Util.*;
import static com.hazelcast.jet.function.DistributedFunctions.entryKey;
import static com.hazelcast.jet.pipeline.Sinks.map;
import static com.hazelcast.jet.pipeline.WindowDefinition.sliding;
Expand Down Expand Up @@ -76,7 +70,7 @@
* │ │ │
* v v v
* ┌────────────────────────┐ ┌────────────────────────┐ ┌────────────────────────┐
* │ Calcutate 5min │ │ Calcutate 30sec │ │ Calcutate 1min │
* │ Calculate 5min │ │ Calculate 30sec │ │ Calculate 1min │
* │Average with Event Count│ │Average with Event Count│ │Average with Event Count│
* └───────────┬────────────┘ └─────────────┬──────────┘ └───────────────┬────────┘
* │ │ │
Expand All @@ -92,7 +86,7 @@ public class JetCoinTrend {
}

public static void main(String[] args) {
System.out.println("DISCLAIMER: This is not an investment advice");
System.out.println("DISCLAIMER: This is not investment advice");

Pipeline pipeline = buildPipeline();
// Start Jet
Expand All @@ -119,24 +113,26 @@ private static Pipeline buildPipeline() {
.drawFrom(StreamTwitterP.streamTwitter(properties, terms))
.addTimestamps()
.flatMap(JetCoinTrend::flatMapToRelevant)
.mapUsingContext(ContextFactory.withCreateFn(jet -> new SentimentAnalyzer()),
.mapUsingContext(ContextFactory
.withCreateFn(jet -> new SentimentAnalyzer())
.shareLocally(),
JetCoinTrend::calculateSentiment)
.groupingKey(entryKey());

AggregateOperation1<Entry<String, Double>, ?, Tuple2<Double, Long>> aggrOp =
allOf(averagingDouble(Entry::getValue), counting());

tweetsWithSentiment.window(sliding(30_000, 10_000))
.aggregate(aggrOp)
.drainTo(map(MAP_NAME_30_SECONDS));
.aggregate(aggrOp)
.drainTo(map(MAP_NAME_30_SECONDS));

tweetsWithSentiment.window(sliding(60_000, 10_000))
.aggregate(aggrOp)
.drainTo(map(MAP_NAME_1_MINUTE));
.aggregate(aggrOp)
.drainTo(map(MAP_NAME_1_MINUTE));

tweetsWithSentiment.window(sliding(300_000, 10_000))
.aggregate(aggrOp)
.drainTo(map(MAP_NAME_5_MINUTE));
.aggregate(aggrOp)
.drainTo(map(MAP_NAME_5_MINUTE));

return pipeline;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,18 @@ public class CoinDefs {

public static Map<String, List<String>> COIN_MAP = new HashMap<>();

public static final int SYMBOL = 0;

static {
COIN_MAP.put("BTC", Arrays.asList("bitcoin", "#btc", "#bitcoin"));
COIN_MAP.put("ETH", Arrays.asList("ether", "ethereum", "#eth", "#ether", "#ethereum"));
COIN_MAP.put("XRP", Arrays.asList("ripple", "#xrp", "#ripple"));
COIN_MAP.put("BCH", Arrays.asList("bitcoin cash", "#bitcoincash", "#bch"));
COIN_MAP.put("ADA", Arrays.asList("cardano", "#ada", "#cardano"));
COIN_MAP.put("LTC", Arrays.asList("litecoin", "#ltc", "#litecoin"));
COIN_MAP.put("XLM", Arrays.asList("stellar", "#xlm", "#steller"));
COIN_MAP.put("BTC", Arrays.asList("Bitcoin", "#btc", "#bitcoin"));
COIN_MAP.put("ETH", Arrays.asList("Ether", "ethereum", "#eth", "#ether", "#ethereum"));
COIN_MAP.put("XRP", Arrays.asList("Ripple", "#xrp", "#ripple"));
COIN_MAP.put("BCH", Arrays.asList("Bitcoin Cash", "#bitcoincash", "#bch"));
COIN_MAP.put("ADA", Arrays.asList("Cardano", "#ada", "#cardano"));
COIN_MAP.put("LTC", Arrays.asList("Litecoin", "#ltc", "#litecoin"));
COIN_MAP.put("XLM", Arrays.asList("Stellar", "#xlm", "#steller"));
COIN_MAP.put("XEM", Arrays.asList("NEM", "#xem", "#nem"));
COIN_MAP.put("EOS", Arrays.asList("EOS", "#eos"));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import java.util.Set;
import java.util.concurrent.locks.LockSupport;

import static com.hazelcast.jet.demo.common.CoinDefs.COIN_MAP;
import static com.hazelcast.jet.demo.common.CoinDefs.SYMBOL;
import static com.hazelcast.util.ExceptionUtil.rethrow;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

Expand Down Expand Up @@ -44,32 +46,48 @@ public static void startConsolePrinterThread(JetInstance jet) {
while (running) {
Set<String> coins = new HashSet<>();

if (map30secs.isEmpty()) {
continue;
}

coins.addAll(map30secs.keySet());
coins.addAll(map1min.keySet());
coins.addAll(map5min.keySet());

System.out.println("/------+---------------+---------------+----------------\\");
System.out.println("| | Sentiment (tweet count) |");
System.out.println("| Coin | Last 30 sec | Last minute | Last 5 minutes |");
System.out.println("|------+---------------+---------------+----------------|");
coins.forEach((c) ->
System.out.println("\n");
System.out.println("/----------------+---------------+---------------+----------------\\");
System.out.println("| | Sentiment (tweet count) |");
System.out.println("| Coin | Last 30 sec | Last minute | Last 5 minutes |");
System.out.println("|----------------+---------------+---------------+----------------|");
coins.forEach((coin) ->
System.out.format("| %s | %s | %s | %s |%n",
c, format(map30secs.get(c)), format(map1min.get(c)), format(map5min.get(c))));
System.out.println("\\------+---------------+---------------+----------------/");
coinName(coin), format(map30secs.get(coin)), format(map1min.get(coin)), format(map5min.get(coin))));
System.out.println("\\----------------+---------------+---------------+----------------/");

LockSupport.parkNanos(MILLISECONDS.toNanos(PRINT_INTERNAL_MILLIS));
}
}).start();
}

/**
* Gets the full name given the code and pads it to a length of 16 characters
*/
private static Object coinName(String coin) {
String name = COIN_MAP.get(coin).get(SYMBOL);
for (int i = name.length(); i < 13; i++){
name = name.concat(" ");
}
return name;
}

public static void stopConsolePrinterThread() {
running = false;
}


public static List<String> loadTerms() {
List<String> terms = new ArrayList<>();
CoinDefs.COIN_MAP.forEach((key, value) -> {
COIN_MAP.forEach((key, value) -> {
terms.add(key);
terms.addAll(value);
});
Expand Down
2 changes: 1 addition & 1 deletion realtime-image-recognition/src/main/java/WebcamSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public static StreamSource<SerializableBufferedImage> webcam() {
forceTotalParallelismOne(ProcessorSupplier.of(WebcamSource::new))
);
}

@Override
public void close(Throwable error) {
if (webcam != null) {
Expand Down

0 comments on commit 8a16212

Please sign in to comment.