Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 187: Add MQTT to Pravega bridge sample #188

Open
wants to merge 3 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ The related documentation and instructions are [here](hadoop-connector-examples)
| [`turbineheatprocessor`](scenarios/turbine-heat-processor) | A Flink streaming application for processing temperature data from a Pravega stream produced by the `turbineheatsensor` app. The application computes a daily summary of the temperature range observed on that day by each sensor. | [Java](scenarios/turbine-heat-processor/src/main/java/io/pravega/turbineheatprocessor), [Scala](scenarios/turbine-heat-processor/src/main/scala/io/pravega/turbineheatprocessor)
| [`anomaly-detection`](scenarios/anomaly-detection) | A Flink streaming application for detecting anomalous input patterns using a finite-state machine. | [Java](scenarios/anomaly-detection/src/main/java/io/pravega/anomalydetection)
| [`pravega-flink-connector-sql-samples`](scenarios/pravega-flink-connector-sql-samples) | Flink connector table api/sql samples. | [Java](scenarios/pravega-flink-connector-sql-samples/src/main/java/io/pravega/connectors.nytaxi)
| [`mqtt-pravega-bridge`](scenarios/mqtt-pravega-bridge) | A sample application reads events from MQTT and writes them to a Pravega stream. | [Java](scenarios/mqtt-pravega-bridge/src/main/java/com/dell/mqtt/pravega)


# Build Instructions
Expand Down
7 changes: 7 additions & 0 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@
pravegaVersion=0.8.0-2518.7685d8d-SNAPSHOT
pravegaKeycloakVersion=0.7.0

#3rd party Versions
guavaVersion=20.0
junitVersion=4.12
qosLogbackVersion=1.2.3
pahoClientMqttv3Version=1.2.0
slf4jApiVersion=1.7.25

### Pravega-samples output library
samplesVersion=0.8.0-SNAPSHOT

Expand Down
41 changes: 41 additions & 0 deletions scenarios/mqtt-pravega-bridge/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# MQTT to Pravega Bridge

This sample application reads events from MQTT and writes them to a Pravega stream.

## Usage

- Install Mosquitto MQTT broker and clients.
```
sudo apt-get install mosquitto mosquitto-clients
```

- If not automatically started, start Mosquitto broker.
```
mosquitto
```

- Edit the file src/main/dist/conf/bridge.properties
to specify your Pravega controller URI (controllerUri) as
`tcp://HOST_IP:9090`.

- Run the application:
```
../../gradlew run
```

- Alternatively, you may run in IntelliJ.
Run the class ApplicationMain with the following parameters:
```
scenarios/mqtt-pravega-bridge/src/main/dist/conf
```

- Publish a sample MQTT message.
Note that the topic must be formatted as "topic/car_id" as shown below.
```
mosquitto_pub -t center/0001 -m "12,34,56.78"
```

- You should see the following application output:
```
[MQTT Call: CanDataReader] io.pravega.example.mqtt.MqttListener: Writing Data Packet: CarID: 0001 Timestamp: 1551671403118 Payload: [B@2813d92f annotation: null
```
55 changes: 55 additions & 0 deletions scenarios/mqtt-pravega-bridge/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
plugins {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are adding license headers to such gradle files too, see pravega/pravega.

id 'com.github.johnrengelman.shadow' version '1.2.4'
}

apply plugin: 'java'
apply plugin: "distribution"
apply plugin: 'application'

version = samplesVersion
sourceCompatibility = 1.8
targetCompatibility = 1.8
mainClassName = 'io.pravega.example.mqtt.ApplicationMain'
applicationDefaultJvmArgs = ["-Dlog4j.configuration=file:conf/log4j.properties"]
archivesBaseName = 'pravega-mqtt-bridge'

repositories {
mavenCentral()
maven {
url "https://repository.apache.org/snapshots"
}
maven {
url "https://oss.sonatype.org/content/repositories/snapshots"
}
}

dependencies {
compile "org.eclipse.paho:org.eclipse.paho.client.mqttv3:${pahoClientMqttv3Version}"
compile "org.slf4j:slf4j-api:${slf4jApiVersion}"
compile "ch.qos.logback:logback-classic:${qosLogbackVersion}"
compile "com.google.guava:guava:${guavaVersion}"
compile "io.pravega:pravega-client:${pravegaVersion}"
testCompile "junit:junit:${junitVersion}"
}

shadowJar {
dependencies {
include dependency("org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.0")
}
}

distributions {
main {
baseName = archivesBaseName
contents {
into('lib') {
from shadowJar
from(project.configurations.shadow)
}
}
}
}

run {
args = ["src/main/dist/conf"]
}
14 changes: 14 additions & 0 deletions scenarios/mqtt-pravega-bridge/src/main/dist/conf/bridge.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Pravega Properties
controllerUri=tcp://127.0.0.1:9090
scope=examples
stream=mqtt-example
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a sample, writing to a single stream is ok, but in general, isn't it possible that we have use cases in which the same bridge writes to different streams, perhaps inferring the stream from the event data?

scaling.targetRate=100
scaling.scaleFactor=3
scaling.minNumSegments=3

# MQTT Properties
brokerUri=tcp://127.0.0.1:1883
topic=center/#
clientId=CanDataReader
userName=admin
password=password
13 changes: 13 additions & 0 deletions scenarios/mqtt-pravega-bridge/src/main/dist/conf/log4j.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
log4j.rootLogger=INFO, stdout, file

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n

log4j.appender.file=org.apache.log4j.RollingFileAppender
log4j.appender.file.File=workspace.log
log4j.appender.file.MaxFileSize=10MB
log4j.appender.file.MaxBackupIndex=10
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
24 changes: 24 additions & 0 deletions scenarios/mqtt-pravega-bridge/src/main/dist/conf/logback.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--

Copyright (c) 2017 Dell Inc., or its subsidiaries. All Rights Reserved.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove year from copyright statement.


Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

-->
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<target>System.out</target>
<encoder>
<pattern>%-5level [%d{yyyy-MM-dd HH:mm:ss.SSS}] [%thread] %logger{36}: %msg%n</pattern>
</encoder>
</appender>

<root level="INFO">
<appender-ref ref="STDOUT" />
</root>
</configuration>
32 changes: 32 additions & 0 deletions scenarios/mqtt-pravega-bridge/src/main/dist/conf/logger.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
<configuration>

<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<!-- encoders are assigned the type
ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>

<appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>logFile.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!-- daily rollover -->
<fileNamePattern>workspace.%d{yyyy-MM-dd}.log</fileNamePattern>

<!-- keep 1 days' worth of history -->
<maxHistory>5</maxHistory>
</rollingPolicy>

<encoder>
<pattern>%d{HH:mm:ss.SSS} %-4relative [%thread] %-5level %logger{35} - %msg%n</pattern>
</encoder>
</appender>

<root level="info" additivity="false">
<appender-ref ref="STDOUT"/>
<appender-ref ref="FILE"/>
</root>


</configuration>
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package io.pravega.example.mqtt;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing license header.


import com.google.common.base.Preconditions;

import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.util.Properties;

public class ApplicationArguments {

private final PravegaArgs pravegaArgs = new PravegaArgs();
private final MqttArgs mqttArgs = new MqttArgs();

public ApplicationArguments(String confDir) throws Exception {
loadProperties(confDir);
}

private void loadProperties(String confDir) throws Exception{
Properties prop = new Properties();
try (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: In Pravega core, the style of these statements is in one line.

InputStream inputStream = new FileInputStream(confDir + File.separator + "bridge.properties");
)
{
prop.load(inputStream);

pravegaArgs.controllerUri = prop.getProperty("controllerUri");
pravegaArgs.scope = prop.getProperty("scope");
pravegaArgs.stream = prop.getProperty("stream");
pravegaArgs.targetRate = Integer.parseInt(prop.getProperty("scaling.targetRate"));
pravegaArgs.scaleFactor = Integer.parseInt(prop.getProperty("scaling.scaleFactor"));
pravegaArgs.minNumSegments = Integer.parseInt(prop.getProperty("scaling.minNumSegments"));

mqttArgs.brokerUri = prop.getProperty("brokerUri");
mqttArgs.topic = prop.getProperty("topic");
mqttArgs.clientId = prop.getProperty("clientId");
mqttArgs.userName = prop.getProperty("userName");
mqttArgs.password = prop.getProperty("password");

Preconditions.checkNotNull(pravegaArgs.controllerUri, "Pravega Controller URI is missing");
Preconditions.checkNotNull(pravegaArgs.scope, "Pravega scope is missing");
Preconditions.checkNotNull(pravegaArgs.stream, "Pravega stream is missing");

Preconditions.checkNotNull(mqttArgs.brokerUri, "MQTT Broker URI is missing");
Preconditions.checkNotNull(mqttArgs.topic, "MQTT topic is missing");
Preconditions.checkNotNull(mqttArgs.clientId, "MQTT clientId is missing");
Preconditions.checkNotNull(mqttArgs.userName, "MQTT userName is missing");
Preconditions.checkNotNull(mqttArgs.password, "MQTT password is missing");
}
}

public PravegaArgs getPravegaArgs() {
return pravegaArgs;
}

public MqttArgs getMqttArgs() {
return mqttArgs;
}

public static class PravegaArgs {
protected String controllerUri;
protected String scope;
protected String stream;
protected int targetRate;
protected int scaleFactor;
protected int minNumSegments;
}

public static class MqttArgs {
protected String brokerUri;
protected String topic;
protected String clientId;
protected String userName;
protected String password;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package io.pravega.example.mqtt;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing license header.


import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CountDownLatch;

public class ApplicationMain {

private static Logger log = LoggerFactory.getLogger( ApplicationMain.class );
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: extra spaces within parenthesis.


public static void main(String ... args) {

if (args.length != 1) {
log.error("Missing required arguments. Usage: java io.pravega.example.mqtt.ApplicationMain <CONF_DIR_PATH>");
return;
}

String confDir = args[0];
log.info("loading configurations from {}", confDir);

final CountDownLatch latch = new CountDownLatch(1);

try {
ApplicationArguments applicationArguments = new ApplicationArguments(confDir);
MqttListener listener = new MqttListener(applicationArguments.getPravegaArgs());

MqttConnectionBuilder builder = new MqttConnectionBuilder();
builder.brokerUri(applicationArguments.getMqttArgs().brokerUri);
builder.topic(applicationArguments.getMqttArgs().topic);
builder.clientId(applicationArguments.getMqttArgs().clientId);
builder.userName(applicationArguments.getMqttArgs().userName);
builder.password(applicationArguments.getMqttArgs().password);
builder.bridge(listener);

MqttClient mqttClient = builder.connect();

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
log.info("Going to close the application");
if (mqttClient != null) {
try {
mqttClient.close();
} catch (MqttException e) {
log.error("Exception Occurred while closing MQTT client", e);
}
}
latch.countDown();
}));
} catch (Exception e) {
log.error("Exception Occurred", e);
}

}
}
Loading