diff --git a/README.md b/README.md index 5cf200a2..61c32d9d 100644 --- a/README.md +++ b/README.md @@ -54,6 +54,7 @@ The related documentation and instructions are [here](hadoop-connector-examples) | [`turbineheatprocessor`](scenarios/turbine-heat-processor) | A Flink streaming application for processing temperature data from a Pravega stream produced by the `turbineheatsensor` app. The application computes a daily summary of the temperature range observed on that day by each sensor. | [Java](scenarios/turbine-heat-processor/src/main/java/io/pravega/turbineheatprocessor), [Scala](scenarios/turbine-heat-processor/src/main/scala/io/pravega/turbineheatprocessor) | [`anomaly-detection`](scenarios/anomaly-detection) | A Flink streaming application for detecting anomalous input patterns using a finite-state machine. | [Java](scenarios/anomaly-detection/src/main/java/io/pravega/anomalydetection) | [`pravega-flink-connector-sql-samples`](scenarios/pravega-flink-connector-sql-samples) | Flink connector table api/sql samples. | [Java](scenarios/pravega-flink-connector-sql-samples/src/main/java/io/pravega/connectors.nytaxi) +| [`mqtt-pravega-bridge`](scenarios/mqtt-pravega-bridge) | A sample application reads events from MQTT and writes them to a Pravega stream. | [Java](scenarios/mqtt-pravega-bridge/src/main/java/com/dell/mqtt/pravega) # Build Instructions diff --git a/gradle.properties b/gradle.properties index da7041ef..bdc2b923 100644 --- a/gradle.properties +++ b/gradle.properties @@ -10,6 +10,13 @@ ### Pravega dependencies pravegaVersion=0.6.0-50.076afef-SNAPSHOT +#3rd party Versions +guavaVersion=20.0 +junitVersion=4.12 +qosLogbackVersion=1.2.3 +pahoClientMqttv3Version=1.2.0 +slf4jApiVersion=1.7.25 + ### Pravega-samples output library samplesVersion=0.6.0-SNAPSHOT diff --git a/scenarios/mqtt-pravega-bridge/README.md b/scenarios/mqtt-pravega-bridge/README.md new file mode 100644 index 00000000..9bd7c297 --- /dev/null +++ b/scenarios/mqtt-pravega-bridge/README.md @@ -0,0 +1,41 @@ +# MQTT to Pravega Bridge + +This sample application reads events from MQTT and writes them to a Pravega stream. + +## Usage + +- Install Mosquitto MQTT broker and clients. + ``` + sudo apt-get install mosquitto mosquitto-clients + ``` + +- If not automatically started, start Mosquitto broker. + ``` + mosquitto + ``` + +- Edit the file src/main/dist/conf/bridge.properties + to specify your Pravega controller URI (controllerUri) as + `tcp://HOST_IP:9090`. + +- Run the application: + ``` + ../../gradlew run + ``` + +- Alternatively, you may run in IntelliJ. + Run the class ApplicationMain with the following parameters: + ``` + scenarios/mqtt-pravega-bridge/src/main/dist/conf + ``` + +- Publish a sample MQTT message. + Note that the topic must be formatted as "topic/car_id" as shown below. + ``` + mosquitto_pub -t center/0001 -m "12,34,56.78" + ``` + +- You should see the following application output: + ``` + [MQTT Call: CanDataReader] io.pravega.example.mqtt.MqttListener: Writing Data Packet: CarID: 0001 Timestamp: 1551671403118 Payload: [B@2813d92f annotation: null + ``` diff --git a/scenarios/mqtt-pravega-bridge/build.gradle b/scenarios/mqtt-pravega-bridge/build.gradle new file mode 100644 index 00000000..7724cd8a --- /dev/null +++ b/scenarios/mqtt-pravega-bridge/build.gradle @@ -0,0 +1,55 @@ +plugins { + id 'com.github.johnrengelman.shadow' version '1.2.4' +} + +apply plugin: 'java' +apply plugin: "distribution" +apply plugin: 'application' + +version = samplesVersion +sourceCompatibility = 1.8 +targetCompatibility = 1.8 +mainClassName = 'io.pravega.example.mqtt.ApplicationMain' +applicationDefaultJvmArgs = ["-Dlog4j.configuration=file:conf/log4j.properties"] +archivesBaseName = 'pravega-mqtt-bridge' + +repositories { + mavenCentral() + maven { + url "https://repository.apache.org/snapshots" + } + maven { + url "https://oss.sonatype.org/content/repositories/snapshots" + } +} + +dependencies { + compile "org.eclipse.paho:org.eclipse.paho.client.mqttv3:${pahoClientMqttv3Version}" + compile "org.slf4j:slf4j-api:${slf4jApiVersion}" + compile "ch.qos.logback:logback-classic:${qosLogbackVersion}" + compile "com.google.guava:guava:${guavaVersion}" + compile "io.pravega:pravega-client:${pravegaVersion}" + testCompile "junit:junit:${junitVersion}" +} + +shadowJar { + dependencies { + include dependency("org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.0") + } +} + +distributions { + main { + baseName = archivesBaseName + contents { + into('lib') { + from shadowJar + from(project.configurations.shadow) + } + } + } +} + +run { + args = ["src/main/dist/conf"] +} diff --git a/scenarios/mqtt-pravega-bridge/src/main/dist/conf/bridge.properties b/scenarios/mqtt-pravega-bridge/src/main/dist/conf/bridge.properties new file mode 100644 index 00000000..107162e2 --- /dev/null +++ b/scenarios/mqtt-pravega-bridge/src/main/dist/conf/bridge.properties @@ -0,0 +1,14 @@ +# Pravega Properties +controllerUri=tcp://127.0.0.1:9090 +scope=examples +stream=mqtt-example +scaling.targetRate=100 +scaling.scaleFactor=3 +scaling.minNumSegments=3 + +# MQTT Properties +brokerUri=tcp://127.0.0.1:1883 +topic=center/# +clientId=CanDataReader +userName=admin +password=password diff --git a/scenarios/mqtt-pravega-bridge/src/main/dist/conf/log4j.properties b/scenarios/mqtt-pravega-bridge/src/main/dist/conf/log4j.properties new file mode 100644 index 00000000..2614ad13 --- /dev/null +++ b/scenarios/mqtt-pravega-bridge/src/main/dist/conf/log4j.properties @@ -0,0 +1,13 @@ +log4j.rootLogger=INFO, stdout, file + +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.Target=System.out +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n + +log4j.appender.file=org.apache.log4j.RollingFileAppender +log4j.appender.file.File=workspace.log +log4j.appender.file.MaxFileSize=10MB +log4j.appender.file.MaxBackupIndex=10 +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n \ No newline at end of file diff --git a/scenarios/mqtt-pravega-bridge/src/main/dist/conf/logback.xml b/scenarios/mqtt-pravega-bridge/src/main/dist/conf/logback.xml new file mode 100644 index 00000000..34f9c7c0 --- /dev/null +++ b/scenarios/mqtt-pravega-bridge/src/main/dist/conf/logback.xml @@ -0,0 +1,24 @@ + + + + + System.out + + %-5level [%d{yyyy-MM-dd HH:mm:ss.SSS}] [%thread] %logger{36}: %msg%n + + + + + + + diff --git a/scenarios/mqtt-pravega-bridge/src/main/dist/conf/logger.xml b/scenarios/mqtt-pravega-bridge/src/main/dist/conf/logger.xml new file mode 100644 index 00000000..896de023 --- /dev/null +++ b/scenarios/mqtt-pravega-bridge/src/main/dist/conf/logger.xml @@ -0,0 +1,32 @@ + + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + logFile.log + + + workspace.%d{yyyy-MM-dd}.log + + + 5 + + + + %d{HH:mm:ss.SSS} %-4relative [%thread] %-5level %logger{35} - %msg%n + + + + + + + + + + \ No newline at end of file diff --git a/scenarios/mqtt-pravega-bridge/src/main/java/io/pravega/example/mqtt/ApplicationArguments.java b/scenarios/mqtt-pravega-bridge/src/main/java/io/pravega/example/mqtt/ApplicationArguments.java new file mode 100644 index 00000000..8bbda46c --- /dev/null +++ b/scenarios/mqtt-pravega-bridge/src/main/java/io/pravega/example/mqtt/ApplicationArguments.java @@ -0,0 +1,76 @@ +package io.pravega.example.mqtt; + +import com.google.common.base.Preconditions; + +import java.io.File; +import java.io.FileInputStream; +import java.io.InputStream; +import java.util.Properties; + +public class ApplicationArguments { + + private final PravegaArgs pravegaArgs = new PravegaArgs(); + private final MqttArgs mqttArgs = new MqttArgs(); + + public ApplicationArguments(String confDir) throws Exception { + loadProperties(confDir); + } + + private void loadProperties(String confDir) throws Exception{ + Properties prop = new Properties(); + try ( + InputStream inputStream = new FileInputStream(confDir + File.separator + "bridge.properties"); + ) + { + prop.load(inputStream); + + pravegaArgs.controllerUri = prop.getProperty("controllerUri"); + pravegaArgs.scope = prop.getProperty("scope"); + pravegaArgs.stream = prop.getProperty("stream"); + pravegaArgs.targetRate = Integer.parseInt(prop.getProperty("scaling.targetRate")); + pravegaArgs.scaleFactor = Integer.parseInt(prop.getProperty("scaling.scaleFactor")); + pravegaArgs.minNumSegments = Integer.parseInt(prop.getProperty("scaling.minNumSegments")); + + mqttArgs.brokerUri = prop.getProperty("brokerUri"); + mqttArgs.topic = prop.getProperty("topic"); + mqttArgs.clientId = prop.getProperty("clientId"); + mqttArgs.userName = prop.getProperty("userName"); + mqttArgs.password = prop.getProperty("password"); + + Preconditions.checkNotNull(pravegaArgs.controllerUri, "Pravega Controller URI is missing"); + Preconditions.checkNotNull(pravegaArgs.scope, "Pravega scope is missing"); + Preconditions.checkNotNull(pravegaArgs.stream, "Pravega stream is missing"); + + Preconditions.checkNotNull(mqttArgs.brokerUri, "MQTT Broker URI is missing"); + Preconditions.checkNotNull(mqttArgs.topic, "MQTT topic is missing"); + Preconditions.checkNotNull(mqttArgs.clientId, "MQTT clientId is missing"); + Preconditions.checkNotNull(mqttArgs.userName, "MQTT userName is missing"); + Preconditions.checkNotNull(mqttArgs.password, "MQTT password is missing"); + } + } + + public PravegaArgs getPravegaArgs() { + return pravegaArgs; + } + + public MqttArgs getMqttArgs() { + return mqttArgs; + } + + public static class PravegaArgs { + protected String controllerUri; + protected String scope; + protected String stream; + protected int targetRate; + protected int scaleFactor; + protected int minNumSegments; + } + + public static class MqttArgs { + protected String brokerUri; + protected String topic; + protected String clientId; + protected String userName; + protected String password; + } +} diff --git a/scenarios/mqtt-pravega-bridge/src/main/java/io/pravega/example/mqtt/ApplicationMain.java b/scenarios/mqtt-pravega-bridge/src/main/java/io/pravega/example/mqtt/ApplicationMain.java new file mode 100644 index 00000000..e7bc937c --- /dev/null +++ b/scenarios/mqtt-pravega-bridge/src/main/java/io/pravega/example/mqtt/ApplicationMain.java @@ -0,0 +1,56 @@ +package io.pravega.example.mqtt; + +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.CountDownLatch; + +public class ApplicationMain { + + private static Logger log = LoggerFactory.getLogger( ApplicationMain.class ); + + public static void main(String ... args) { + + if (args.length != 1) { + log.error("Missing required arguments. Usage: java io.pravega.example.mqtt.ApplicationMain "); + return; + } + + String confDir = args[0]; + log.info("loading configurations from {}", confDir); + + final CountDownLatch latch = new CountDownLatch(1); + + try { + ApplicationArguments applicationArguments = new ApplicationArguments(confDir); + MqttListener listener = new MqttListener(applicationArguments.getPravegaArgs()); + + MqttConnectionBuilder builder = new MqttConnectionBuilder(); + builder.brokerUri(applicationArguments.getMqttArgs().brokerUri); + builder.topic(applicationArguments.getMqttArgs().topic); + builder.clientId(applicationArguments.getMqttArgs().clientId); + builder.userName(applicationArguments.getMqttArgs().userName); + builder.password(applicationArguments.getMqttArgs().password); + builder.bridge(listener); + + MqttClient mqttClient = builder.connect(); + + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + log.info("Going to close the application"); + if (mqttClient != null) { + try { + mqttClient.close(); + } catch (MqttException e) { + log.error("Exception Occurred while closing MQTT client", e); + } + } + latch.countDown(); + })); + } catch (Exception e) { + log.error("Exception Occurred", e); + } + + } +} diff --git a/scenarios/mqtt-pravega-bridge/src/main/java/io/pravega/example/mqtt/DataPacket.java b/scenarios/mqtt-pravega-bridge/src/main/java/io/pravega/example/mqtt/DataPacket.java new file mode 100644 index 00000000..576a4547 --- /dev/null +++ b/scenarios/mqtt-pravega-bridge/src/main/java/io/pravega/example/mqtt/DataPacket.java @@ -0,0 +1,60 @@ +package io.pravega.example.mqtt; + +import java.io.Serializable; +import java.util.Arrays; + +/** + * Wrapper class that holds raw data and its corresponding annotation info + */ +public class DataPacket implements Serializable { + + private String carId; + + private long timestamp; + + private byte[] payload; + + private byte[] annotation; + + public String getCarId() { + return carId; + } + + public void setCarId(String carId) { + this.carId = carId; + } + + public byte[] getPayload() { + return payload; + } + + public void setPayload(byte[] payload) { + this.payload = payload; + } + + public byte[] getAnnotation() { + return annotation; + } + + public void setAnnotation(byte[] annotation) { + this.annotation = annotation; + } + + public long getTimestamp() { + return timestamp; + } + + public void setTimestamp(long timestamp) { + this.timestamp = timestamp; + } + + @Override + public String toString() { + return "DataPacket{" + + "carId='" + carId + '\'' + + ", timestamp=" + timestamp + + ", payload=" + Arrays.toString(payload) + + ", annotation=" + Arrays.toString(annotation) + + '}'; + } +} diff --git a/scenarios/mqtt-pravega-bridge/src/main/java/io/pravega/example/mqtt/MqttConnectionBuilder.java b/scenarios/mqtt-pravega-bridge/src/main/java/io/pravega/example/mqtt/MqttConnectionBuilder.java new file mode 100644 index 00000000..12d3c309 --- /dev/null +++ b/scenarios/mqtt-pravega-bridge/src/main/java/io/pravega/example/mqtt/MqttConnectionBuilder.java @@ -0,0 +1,82 @@ +package io.pravega.example.mqtt; + +import com.google.common.base.Preconditions; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttClientPersistence; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; + +public class MqttConnectionBuilder { + + private String brokerUri; + private String topic; + private String clientId; + private String userName; + private String password; + private MqttCallback mqttCallback; + private MqttClientPersistence persistence; + + public MqttConnectionBuilder brokerUri(String brokerUri) { + this.brokerUri = brokerUri; + return this; + } + + public MqttConnectionBuilder topic(String topic) { + this.topic = topic; + return this; + } + + public MqttConnectionBuilder clientId(String clientId) { + this.clientId = clientId; + return this; + } + + public MqttConnectionBuilder userName(String userName) { + this.userName = userName; + return this; + } + + public MqttConnectionBuilder password(String password) { + this.password = password; + return this; + } + + public MqttConnectionBuilder bridge(MqttCallback mqttCallback) { + this.mqttCallback = mqttCallback; + return this; + } + + public MqttConnectionBuilder persistence(MqttClientPersistence persistence) { + this.persistence = persistence; + return this; + } + + public MqttClient connect() throws MqttException { + + Preconditions.checkNotNull(brokerUri, "Missing MQTT broker information"); + Preconditions.checkNotNull(topic, "Missing MQTT topic information"); + Preconditions.checkNotNull(clientId, "Missing MQTT clientId"); + Preconditions.checkNotNull(userName, "Missing MQTT userName"); + Preconditions.checkNotNull(password, "Missing MQTT password"); + Preconditions.checkNotNull(mqttCallback, "Missing MQTT callback handler"); + + + MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); + mqttConnectOptions.setUserName(userName); + mqttConnectOptions.setPassword(password.toCharArray()); + + MqttClient mqttClient; + if (persistence != null) { + mqttClient = new MqttClient(brokerUri, clientId, persistence); + } else { + mqttClient = new MqttClient(brokerUri, clientId); + } + + mqttClient.setCallback(mqttCallback); + mqttClient.connect(mqttConnectOptions); + mqttClient.subscribe(topic); + + return mqttClient; + } +} diff --git a/scenarios/mqtt-pravega-bridge/src/main/java/io/pravega/example/mqtt/MqttListener.java b/scenarios/mqtt-pravega-bridge/src/main/java/io/pravega/example/mqtt/MqttListener.java new file mode 100644 index 00000000..5febe16f --- /dev/null +++ b/scenarios/mqtt-pravega-bridge/src/main/java/io/pravega/example/mqtt/MqttListener.java @@ -0,0 +1,51 @@ +package io.pravega.example.mqtt; + +import io.pravega.client.stream.EventStreamWriter; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.CompletableFuture; + +/** + * MQTT Listener call back handler that listens to specified MQTT topic and fetches the posted data + */ + +public class MqttListener implements MqttCallback { + + private static Logger log = LoggerFactory.getLogger( MqttListener.class ); + + private final EventStreamWriter writer; + + public MqttListener(ApplicationArguments.PravegaArgs pravegaArgs) { + writer = PravegaHelper.getStreamWriter(pravegaArgs); + } + + @Override + public void connectionLost(Throwable cause) { + log.debug("Received connection lost message. Reason: {}", cause); + writer.close(); + } + + @Override + public void messageArrived(String topic, MqttMessage message) throws Exception { + log.debug("Received new message from the topic: {}", topic); + + String carId = topic.split("/")[1]; + DataPacket packet = new DataPacket(); + packet.setTimestamp(System.currentTimeMillis()); + packet.setCarId(carId); + packet.setPayload(message.getPayload()); + + log.info("Writing Data Packet: {}", packet); + + CompletableFuture future = writer.writeEvent(carId, packet); + future.get(); + } + + @Override + public void deliveryComplete(IMqttDeliveryToken token) {} + +} diff --git a/scenarios/mqtt-pravega-bridge/src/main/java/io/pravega/example/mqtt/PravegaHelper.java b/scenarios/mqtt-pravega-bridge/src/main/java/io/pravega/example/mqtt/PravegaHelper.java new file mode 100644 index 00000000..4055d7df --- /dev/null +++ b/scenarios/mqtt-pravega-bridge/src/main/java/io/pravega/example/mqtt/PravegaHelper.java @@ -0,0 +1,39 @@ +package io.pravega.example.mqtt; + +import io.pravega.client.ClientConfig; +import io.pravega.client.EventStreamClientFactory; +import io.pravega.client.admin.StreamManager; +import io.pravega.client.stream.EventStreamWriter; +import io.pravega.client.stream.EventWriterConfig; +import io.pravega.client.stream.ScalingPolicy; +import io.pravega.client.stream.StreamConfiguration; +import io.pravega.client.stream.impl.JavaSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URI; + +public class PravegaHelper { + + private static Logger log = LoggerFactory.getLogger( PravegaHelper.class ); + + public static EventStreamWriter getStreamWriter(ApplicationArguments.PravegaArgs pravegaArgs) { + log.info("Connecting to Pravega URI: {}, Scope: {}, Stream: {}", + pravegaArgs.controllerUri, pravegaArgs.scope, pravegaArgs.stream); + + try (StreamManager streamManager = StreamManager.create(URI.create(pravegaArgs.controllerUri))) { + StreamConfiguration streamConfig = StreamConfiguration.builder() + .scalingPolicy(ScalingPolicy.byEventRate(pravegaArgs.targetRate, pravegaArgs.scaleFactor, pravegaArgs.minNumSegments)) + .build(); + streamManager.createStream(pravegaArgs.scope, pravegaArgs.stream, streamConfig); + } + + URI controllerURI = URI.create(pravegaArgs.controllerUri); + ClientConfig clientConfig = ClientConfig.builder().controllerURI(controllerURI).build(); + EventStreamClientFactory clientFactory = EventStreamClientFactory.withScope(pravegaArgs.scope, clientConfig); + EventStreamWriter writer = clientFactory.createEventWriter(pravegaArgs.stream, + new JavaSerializer(), + EventWriterConfig.builder().build()); + return writer; + } +} diff --git a/scenarios/mqtt-pravega-bridge/src/main/resources/logback.xml b/scenarios/mqtt-pravega-bridge/src/main/resources/logback.xml new file mode 100644 index 00000000..34f9c7c0 --- /dev/null +++ b/scenarios/mqtt-pravega-bridge/src/main/resources/logback.xml @@ -0,0 +1,24 @@ + + + + + System.out + + %-5level [%d{yyyy-MM-dd HH:mm:ss.SSS}] [%thread] %logger{36}: %msg%n + + + + + + + diff --git a/settings.gradle b/settings.gradle index 34daecb2..b4672502 100644 --- a/settings.gradle +++ b/settings.gradle @@ -18,3 +18,4 @@ include 'scenarios/turbine-heat-sensor' include 'scenarios/turbine-heat-processor' include 'scenarios/anomaly-detection' include 'scenarios/pravega-flink-connector-sql-samples' +include 'scenarios/mqtt-pravega-bridge'