Skip to content

Commit

Permalink
Removed custom sink processors (hazelcast#59)
Browse files Browse the repository at this point in the history
  • Loading branch information
Emin Demirci authored Mar 6, 2018
1 parent 45260fd commit cdcd043
Show file tree
Hide file tree
Showing 4 changed files with 174 additions and 184 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,19 @@
import com.hazelcast.jet.pipeline.StreamStage;
import com.hazelcast.jet.pipeline.WindowDefinition;
import com.hazelcast.map.listener.EntryAddedListener;
import java.io.BufferedOutputStream;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.Map.Entry;
import java.util.SortedMap;
import java.util.function.Consumer;
import org.python.core.PyFloat;
import org.python.core.PyInteger;
import org.python.core.PyList;
import org.python.core.PyString;
import org.python.core.PyTuple;
import org.python.modules.cPickle;

import static com.hazelcast.jet.Util.entry;
import static com.hazelcast.jet.aggregate.AggregateOperations.allOf;
Expand All @@ -43,6 +53,8 @@
import static com.hazelcast.jet.demo.util.Util.inParis;
import static com.hazelcast.jet.demo.util.Util.inTokyo;
import static com.hazelcast.jet.function.DistributedComparator.comparingInt;
import static com.hazelcast.jet.impl.util.Util.uncheckCall;
import static com.hazelcast.jet.impl.util.Util.uncheckRun;
import static java.util.Collections.emptySortedMap;
import static java.util.concurrent.TimeUnit.SECONDS;

Expand Down Expand Up @@ -131,8 +143,6 @@ public static void main(String[] args) {
private static Pipeline buildPipeline() {
Pipeline p = Pipeline.create();

Sink<Object> graphiteSink = GraphiteSink.sink("127.0.0.1", 2004);

SlidingWindowDef slidingWindow = WindowDefinition.sliding(60_000, 30_000);
StreamStage<TimestampedEntry<Long, Aircraft>> flights = p
.drawFrom(streamAircraft(SOURCE_URL, 10000))
Expand Down Expand Up @@ -160,11 +170,12 @@ private static Pipeline buildPipeline() {

landingFlights.drainTo(Sinks.map(LANDING_MAP)); // (aircraft_id, aircraft)

StreamStage<TimestampedEntry<String, Entry<Aircraft, Integer>>> maxNoise = flights
StreamStage<TimestampedEntry<String, Integer>> maxNoise = flights
.map(e -> entry(e.getValue(), getNoise(e.getValue()))) // (aircraft, noise)
.window(slidingWindow)
.groupingKey(e -> e.getKey().getAirport() + "_AVG_NOISE")
.aggregate(maxBy(comparingInt(Entry::getValue)));
.aggregate(maxBy(comparingInt(Entry::getValue)))
.map(e -> new TimestampedEntry<>(e.getTimestamp(), e.getKey(), e.getValue().getValue()));
// (airport, max_noise)

StreamStage<TimestampedEntry<String, Double>> co2Emission = flights
Expand All @@ -174,11 +185,38 @@ private static Pipeline buildPipeline() {
.aggregate(summingDouble(Entry::getValue));
// (airport, total_co2)

Sink<TimestampedEntry> graphiteSink = buildGraphiteSink("127.0.0.1", 2004);

p.drainTo(graphiteSink, co2Emission, maxNoise, landingFlights, takingOffFlights);
return p;
}

/**
* Sink implementation which forwards the items it receives to the Graphite.
* Graphite's Pickle Protocol is used for communication between Jet and Graphite.
*
* @param host Graphite host
* @param port Graphite port
*/
private static Sink<TimestampedEntry> buildGraphiteSink(String host, int port) {
return Sinks.<BufferedOutputStream, TimestampedEntry>builder(instance -> uncheckCall(()
-> new BufferedOutputStream(new Socket(host, port).getOutputStream())
))
.onReceiveFn((bos, entry) -> uncheckRun(() -> {
GraphiteMetric metric = new GraphiteMetric();
metric.from(entry);

PyString payload = cPickle.dumps(metric.getAsList(), 2);
byte[] header = ByteBuffer.allocate(4).putInt(payload.__len__()).array();

bos.write(header);
bos.write(payload.toBytes());
}))
.flushFn((bos) -> uncheckRun(bos::flush))
.destroyFn((bos) -> uncheckRun(bos::close))
.build();
}

/**
* Returns the average C02 emission on landing/take-offfor the aircraft
*
Expand All @@ -189,6 +227,12 @@ private static Double getCO2Emission(Aircraft aircraft) {
return typeToLTOCycyleC02Emission.getOrDefault(aircraft.getType(), 0d);
}

/**
* Returns the noise level at the current altitude of the aircraft
*
* @param aircraft
* @return noise level of the aircraft
*/
private static Integer getNoise(Aircraft aircraft) {
Long altitude = aircraft.getAlt();
SortedMap<Integer, Integer> lookupTable = getPhaseNoiseLookupTable(aircraft);
Expand All @@ -198,16 +242,20 @@ private static Integer getNoise(Aircraft aircraft) {
return lookupTable.tailMap(altitude.intValue()).values().iterator().next();
}


private static Aircraft assignAirport(Aircraft ac) {
if (ac.getAlt() > 0 && !ac.isGnd()) {
String airport = getAirport(ac.lon, ac.lat);
/**
* Sets the airport field of the aircraft by looking at the coordinates of it
*
* @param aircraft
*/
private static Aircraft assignAirport(Aircraft aircraft) {
if (aircraft.getAlt() > 0 && !aircraft.isGnd()) {
String airport = getAirport(aircraft.lon, aircraft.lat);
if (airport == null) {
return null;
}
ac.setAirport(airport);
aircraft.setAirport(airport);
}
return ac;
return aircraft;
}

/**
Expand Down Expand Up @@ -281,9 +329,73 @@ private static VerticalDirection getVerticalDirection(double coefficient) {
}
}

/**
* Attaches a listener to {@link IMapJet} which passes added items to the specified consumer
*
* @param map map instance which the listener will be added
* @param consumer aircraft consumer that the added items will be passed on.
*/
private static void addListener(IMapJet<Long, Aircraft> map, Consumer<Aircraft> consumer) {
map.addEntryListener((EntryAddedListener<Long, Aircraft>) event ->
consumer.accept(event.getValue()), true);
}

/**
* A data transfer object for Graphite
*/
private static class GraphiteMetric {
PyString metricName;
PyInteger timestamp;
PyFloat metricValue;

private GraphiteMetric() {
}

private void fromAirCraftEntry(TimestampedEntry<Long, Aircraft> aircraftEntry) {
Aircraft aircraft = aircraftEntry.getValue();
metricName = new PyString(replaceWhiteSpace(aircraft.getAirport()) + "." + aircraft.verticalDirection);
timestamp = new PyInteger(getEpochSecond(aircraft.getPosTime()));
metricValue = new PyFloat(1);
}

private void fromMaxNoiseEntry(TimestampedEntry<String, Integer> entry) {
metricName = new PyString(replaceWhiteSpace(entry.getKey()));
timestamp = new PyInteger(getEpochSecond(entry.getTimestamp()));
metricValue = new PyFloat(entry.getValue());
}

private void fromTotalC02Entry(TimestampedEntry<String, Double> entry) {
metricName = new PyString(replaceWhiteSpace(entry.getKey()));
timestamp = new PyInteger(getEpochSecond(entry.getTimestamp()));
metricValue = new PyFloat(entry.getValue());
}

void from(TimestampedEntry entry) {
if (entry.getKey() instanceof Long) {
TimestampedEntry<Long, Aircraft> aircraftEntry = entry;
fromAirCraftEntry(aircraftEntry);
} else {
if (entry.getValue() instanceof Double) {
fromTotalC02Entry(entry);
} else {
fromMaxNoiseEntry(entry);
}
}
}

PyList getAsList() {
PyList list = new PyList();
PyTuple metric = new PyTuple(metricName, new PyTuple(timestamp, metricValue));
list.add(metric);
return list;
}

private int getEpochSecond(long millis) {
return (int) Instant.ofEpochMilli(millis).getEpochSecond();
}

private String replaceWhiteSpace(String string) {
return string.replace(" ", "_");
}
}
}

This file was deleted.

84 changes: 0 additions & 84 deletions realtime-image-recognition/src/main/java/GUISink.java

This file was deleted.

Loading

0 comments on commit cdcd043

Please sign in to comment.