Skip to content

Commit

Permalink
initial import
Browse files Browse the repository at this point in the history
  • Loading branch information
hakanilter committed Mar 20, 2018
0 parents commit 237e990
Show file tree
Hide file tree
Showing 18 changed files with 784 additions and 0 deletions.
45 changes: 45 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# use glob syntax.
syntax: glob
*.ser
*.class
*~
*.bak
#*.off
*.old

# eclipse conf file
.settings
.classpath
.project
.manager
.scala_dependencies

# idea
.idea
*.iml

# building
target
build
null
tmp*
temp*
dist
test-output
build.log

# other scm
.svn
.CVS
.hg*

# switch to regexp syntax.
# syntax: regexp
# ^\.pc/

#SHITTY output not in target directory
build.log

.cache*

*.DS_Store
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Kafka Spark Streaming
An example project for integrating Kafka and Spark Streaming in order to run streaming sql queries.
247 changes: 247 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.datapyro</groupId>
<artifactId>kafka-spark-streaming</artifactId>
<version>1.0.0-SNAPSHOT</version>
<packaging>jar</packaging>

<properties>
<!-- Build config -->
<maven.memory>1G</maven.memory>
<encoding>UTF-8</encoding>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
<java.compiler.version>1.8</java.compiler.version>
<scala.version>2.11.8</scala.version>
<scala.tools.version>2.11</scala.tools.version>
<maven-compiler-plugin.version>2.3.2</maven-compiler-plugin.version>
<maven-scala-plugin.version>3.2.0</maven-scala-plugin.version>
<maven-surefire-plugin.version>2.12</maven-surefire-plugin.version>
<maven-shade-plugin.version>2.3</maven-shade-plugin.version>
<!-- Dependencies -->
<kafka.version>1.0.1</kafka.version>
<kafka-avro-serializer.version>3.3.0</kafka-avro-serializer.version>
<avro.version>1.8.1</avro.version>
<spark.version>2.3.0</spark.version>
<gson.version>2.6</gson.version>
<slf4j.version>1.7.25</slf4j.version>
<junit.version>4.10</junit.version>
</properties>

<dependencies>
<!-- Scala -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
<scope>compile</scope>
</dependency>
<!-- Kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<!-- Avro -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>${kafka-avro-serializer.version}</version>
</dependency>
<!-- Spark -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.tools.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.tools.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.tools.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_${scala.tools.version}</artifactId>
<version>2.3.0</version>
</dependency>
<!-- Gson -->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>${gson.version}</version>
</dependency>
<!-- Slf4j -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
<!-- Junit -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/main/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven-compiler-plugin.version}</version>
<configuration>
<compilerArguments>
<encoding>${project.build.sourceEncoding}</encoding>
</compilerArguments>
<source>${java.version}</source>
<target>${java.version}</target>
<compilerVersion>${java.compiler.version}</compilerVersion>
<optimize>true</optimize>
<encoding>${project.build.sourceEncoding}</encoding>
</configuration>
</plugin>
<plugin>
<!-- see http://davidb.github.com/scala-maven-plugin -->
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>${maven-scala-plugin.version}</version>
<executions>
<!-- Run scala compiler in the process-resources phase, so that dependencies on
scala classes can be resolved later in the (Java) compile phase -->
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
<!-- Run scala compiler in the process-test-resources phase, so that dependencies on
scala classes can be resolved later in the (Java) test-compile phase -->
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
<!-- Make Scala source code visible to Maven. This is required for correct work of Eclipse/Scala integration. -->
<execution>
<id>scala-add-source</id>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sourceDir>src/main/java</sourceDir>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>${maven-surefire-plugin.version}</version>
<dependencies>
<dependency>
<groupId>org.apache.maven.surefire</groupId>
<artifactId>surefire-junit4</artifactId>
<version>${maven-surefire-plugin.version}</version>
</dependency>
</dependencies>
<configuration>
<useFile>false</useFile>
<disableXmlReport>true</disableXmlReport>
<!-- If you have classpath issue like NoDefClassError,... -->
<!-- useManifestOnlyJar>false</useManifestOnlyJar -->
<includes>
<include>**/*Test.*</include>
<include>**/*Suite.*</include>
</includes>
<excludes>
<exclude>**/*IntegrationTest.*</exclude>
<exclude>**/*FunctionalTest.*</exclude>
<exclude>**/*PerformanceTest.*</exclude>
</excludes>
<forkMode>always</forkMode>
<argLine>-Xms${maven.memory} -Xmx${maven.memory}</argLine>
<systemPropertyVariables>
<environment>test</environment>
<project.build.directory>${project.build.directory}</project.build.directory>
</systemPropertyVariables>
</configuration>
</plugin>
</plugins>
</build>

<profiles>
<profile>
<id>make-jar</id>
<activation>
<activeByDefault>false</activeByDefault>
</activation>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>${maven-shade-plugin.version}</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<shadedArtifactAttached>true</shadedArtifactAttached>
<shadedClassifierName>dist</shadedClassifierName>
<artifactSet>
<includes>
<include>*:*</include>
</includes>
</artifactSet>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>

</project>
13 changes: 13 additions & 0 deletions scripts/create-topic.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/bin/bash

if [ -z "$KAFKA_HOME" ]
then
echo "Please set \$KAFKA_HOME environment variable properly"
exit
fi

TOPIC=network-data
PARTITIONS=4
REPLICATION=1

$KAFKA_HOME/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic $TOPIC --partitions $PARTITIONS --replication-factor $REPLICATION
7 changes: 7 additions & 0 deletions scripts/kafka-setup.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#!/bin/bash
KAFKA_VERSION=kafka_2.11-1.0.0
wget http://mirror.vorboss.net/apache/kafka/1.0.0/$KAFKA_VERSION.tgz
tar xvf $KAFKA_VERSION.tgz
INSTALL_DIR=$(pwd)
export KAFKA_HOME=$INSTALL_DIR/$KAFKA_VERSION
echo "export KAFKA_HOME=$KAFKA_HOME"
9 changes: 9 additions & 0 deletions scripts/start-kafka.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#!/bin/bash

if [ -z "$KAFKA_HOME" ]
then
echo "Please set \$KAFKA_HOME environment variable properly"
exit
fi

$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties
9 changes: 9 additions & 0 deletions scripts/start-zookeeper.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#!/bin/bash

if [ -z "$KAFKA_HOME" ]
then
echo "Please set \$KAFKA_HOME environment variable properly"
exit
fi

$KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties
42 changes: 42 additions & 0 deletions src/main/java/com/datapyro/kafka/model/NetworkData.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.datapyro.kafka.model;

import com.datapyro.kafka.util.JsonSerializable;

import java.util.List;

public class NetworkData extends JsonSerializable {

private static final long serialVersionUID = -7188517737272721658L;

private String deviceId;
private Long time;
private List<NetworkSignal> signals;

public String getDeviceId() {
return deviceId;
}

public NetworkData setDeviceId(String deviceId) {
this.deviceId = deviceId;
return this;
}

public Long getTime() {
return time;
}

public NetworkData setTime(Long time) {
this.time = time;
return this;
}

public List<NetworkSignal> getSignals() {
return signals;
}

public NetworkData setSignals(List<NetworkSignal> signals) {
this.signals = signals;
return this;
}

}
Loading

0 comments on commit 237e990

Please sign in to comment.