Skip to content

Commit

Permalink
Initial Commit
Browse files Browse the repository at this point in the history
  • Loading branch information
LearningJournal committed Aug 27, 2020
1 parent 07a8ae0 commit 58037be
Show file tree
Hide file tree
Showing 11 changed files with 134 additions and 0 deletions.
70 changes: 70 additions & 0 deletions 12-StreamStreamJoinDemo/StreamStreamJoinDemo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, to_timestamp, col, expr
from pyspark.sql.types import StructType, StructField, StringType

from lib.logger import Log4j

if __name__ == "__main__":
spark = SparkSession \
.builder \
.master("local[3]") \
.appName("Stream Stream Join Demo") \
.config("spark.streaming.stopGracefullyOnShutdown", "true") \
.config("spark.sql.shuffle.partitions", 2) \
.getOrCreate()

logger = Log4j(spark)

impressionSchema = StructType([
StructField("InventoryID", StringType()),
StructField("CreatedTime", StringType()),
StructField("Campaigner", StringType())
])

clickSchema = StructType([
StructField("InventoryID", StringType()),
StructField("CreatedTime", StringType())
])

kafka_impression_df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "impressions") \
.option("startingOffsets", "earliest") \
.load()

impressions_df = kafka_impression_df \
.select(from_json(col("value").cast("string"), impressionSchema).alias("value")) \
.selectExpr("value.InventoryID as ImpressionID", "value.CreatedTime", "value.Campaigner") \
.withColumn("ImpressionTime", to_timestamp(col("CreatedTime"), "yyyy-MM-dd HH:mm:ss")) \
.drop("CreatedTime")

kafka_click_df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "clicks") \
.option("startingOffsets", "earliest") \
.load()

clicks_df = kafka_click_df.select(
from_json(col("value").cast("string"), clickSchema).alias("value")) \
.selectExpr("value.InventoryID as ClickID", "value.CreatedTime") \
.withColumn("ClickTime", to_timestamp(col("CreatedTime"), "yyyy-MM-dd HH:mm:ss")) \
.drop("CreatedTime")

join_expr = "ImpressionID == ClickID"
join_type = "inner"

joined_df = impressions_df.join(clicks_df, expr(join_expr), join_type)

output_query = joined_df.writeStream \
.format("console") \
.outputMode("append") \
.option("checkpointLocation", "chk-point-dir") \
.trigger(processingTime="1 minute") \
.start()

logger.info("Waiting for Query")
output_query.awaitTermination()
10 changes: 10 additions & 0 deletions 12-StreamStreamJoinDemo/data/events.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{"InventoryID": "100001", "CreatedTime": "2020-09-09 10:00:00", "Campaigner": "ABC Ltd"}
{"InventoryID": "100002", "CreatedTime": "2020-09-09 10:06:00", "Campaigner": "ABC Ltd"}
{"InventoryID": "100003", "CreatedTime": "2020-09-09 10:02:00", "Campaigner": "XYZ Ltd"}
{"InventoryID": "100004", "CreatedTime": "2020-09-09 10:09:00", "Campaigner": "XYZ Ltd"}

{"InventoryID": "100001", "CreatedTime": "2020-09-09 10:18:00"}
{"InventoryID": "100002", "CreatedTime": "2020-09-09 10:18:00"}
{"InventoryID": "100003", "CreatedTime": "2020-09-09 10:18:00"}
{"InventoryID": "100004", "CreatedTime": "2020-09-09 10:18:00"}
{"InventoryID": "100001", "CreatedTime": "2020-09-09 10:18:00"}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
%KAFKA_HOME%\bin\windows\zookeeper-server-start.bat %KAFKA_HOME%\config\zookeeper.properties
1 change: 1 addition & 0 deletions 12-StreamStreamJoinDemo/kafka-scripts/02-start-kafka.cmd
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
%KAFKA_HOME%\bin\windows\kafka-server-start.bat %KAFKA_HOME%\config\server-0.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
%KAFKA_HOME%\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic impressions
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
%KAFKA_HOME%\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic clicks
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
%KAFKA_HOME%\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic impressions
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
%KAFKA_HOME%\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic clicks
Empty file.
21 changes: 21 additions & 0 deletions 12-StreamStreamJoinDemo/lib/logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
class Log4j:
def __init__(self, spark):
log4j = spark._jvm.org.apache.log4j

root_class = "guru.learningjournal.spark.examples"
conf = spark.sparkContext.getConf()
app_name = conf.get("spark.app.name")

self.logger = log4j.LogManager.getLogger(root_class + "." + app_name)

def warn(self, message):
self.logger.warn(message)

def info(self, message):
self.logger.info(message)

def error(self, message):
self.logger.error(message)

def debug(self, message):
self.logger.debug(message)
27 changes: 27 additions & 0 deletions 12-StreamStreamJoinDemo/log4j.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Set everything to be logged to the console
log4j.rootCategory=WARN, console

# define console appender
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

#application log
log4j.logger.guru.learningjournal.spark.examples=INFO, console
log4j.additivity.guru.learningjournal.spark.examples=false

#define following in Java System
# -Dlog4j.configuration=file:log4j.properties

# Recommendations from Spark template
log4j.logger.org.apache.spark.repl.Main=WARN
log4j.logger.org.spark_project.jetty=WARN
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
log4j.logger.org.apache.parquet=ERROR
log4j.logger.parquet=ERROR
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR

0 comments on commit 58037be

Please sign in to comment.