From 58037be8003d95540d6edd3f6de12d7f141c3845 Mon Sep 17 00:00:00 2001 From: Prashant Kumar Pandey Date: Thu, 27 Aug 2020 20:58:51 +0530 Subject: [PATCH] Initial Commit --- .../StreamStreamJoinDemo.py | 70 +++++++++++++++++++ 12-StreamStreamJoinDemo/data/events.txt | 10 +++ .../kafka-scripts/01-start-zookeeper.cmd | 1 + .../kafka-scripts/02-start-kafka.cmd | 1 + .../03-create-impression-topic.cmd | 1 + .../kafka-scripts/04-create-click-topic.cmd | 1 + .../05-start-impression-producer.cmd | 1 + .../kafka-scripts/06-start-click-producer.cmd | 1 + 12-StreamStreamJoinDemo/lib/__init__.py | 0 12-StreamStreamJoinDemo/lib/logger.py | 21 ++++++ 12-StreamStreamJoinDemo/log4j.properties | 27 +++++++ 11 files changed, 134 insertions(+) create mode 100644 12-StreamStreamJoinDemo/StreamStreamJoinDemo.py create mode 100644 12-StreamStreamJoinDemo/data/events.txt create mode 100644 12-StreamStreamJoinDemo/kafka-scripts/01-start-zookeeper.cmd create mode 100644 12-StreamStreamJoinDemo/kafka-scripts/02-start-kafka.cmd create mode 100644 12-StreamStreamJoinDemo/kafka-scripts/03-create-impression-topic.cmd create mode 100644 12-StreamStreamJoinDemo/kafka-scripts/04-create-click-topic.cmd create mode 100644 12-StreamStreamJoinDemo/kafka-scripts/05-start-impression-producer.cmd create mode 100644 12-StreamStreamJoinDemo/kafka-scripts/06-start-click-producer.cmd create mode 100644 12-StreamStreamJoinDemo/lib/__init__.py create mode 100644 12-StreamStreamJoinDemo/lib/logger.py create mode 100644 12-StreamStreamJoinDemo/log4j.properties diff --git a/12-StreamStreamJoinDemo/StreamStreamJoinDemo.py b/12-StreamStreamJoinDemo/StreamStreamJoinDemo.py new file mode 100644 index 0000000..fb60b8d --- /dev/null +++ b/12-StreamStreamJoinDemo/StreamStreamJoinDemo.py @@ -0,0 +1,70 @@ +from pyspark.sql import SparkSession +from pyspark.sql.functions import from_json, to_timestamp, col, expr +from pyspark.sql.types import StructType, StructField, StringType + +from lib.logger import Log4j + +if __name__ == "__main__": + spark = SparkSession \ + .builder \ + .master("local[3]") \ + .appName("Stream Stream Join Demo") \ + .config("spark.streaming.stopGracefullyOnShutdown", "true") \ + .config("spark.sql.shuffle.partitions", 2) \ + .getOrCreate() + + logger = Log4j(spark) + + impressionSchema = StructType([ + StructField("InventoryID", StringType()), + StructField("CreatedTime", StringType()), + StructField("Campaigner", StringType()) + ]) + + clickSchema = StructType([ + StructField("InventoryID", StringType()), + StructField("CreatedTime", StringType()) + ]) + + kafka_impression_df = spark \ + .readStream \ + .format("kafka") \ + .option("kafka.bootstrap.servers", "localhost:9092") \ + .option("subscribe", "impressions") \ + .option("startingOffsets", "earliest") \ + .load() + + impressions_df = kafka_impression_df \ + .select(from_json(col("value").cast("string"), impressionSchema).alias("value")) \ + .selectExpr("value.InventoryID as ImpressionID", "value.CreatedTime", "value.Campaigner") \ + .withColumn("ImpressionTime", to_timestamp(col("CreatedTime"), "yyyy-MM-dd HH:mm:ss")) \ + .drop("CreatedTime") + + kafka_click_df = spark \ + .readStream \ + .format("kafka") \ + .option("kafka.bootstrap.servers", "localhost:9092") \ + .option("subscribe", "clicks") \ + .option("startingOffsets", "earliest") \ + .load() + + clicks_df = kafka_click_df.select( + from_json(col("value").cast("string"), clickSchema).alias("value")) \ + .selectExpr("value.InventoryID as ClickID", "value.CreatedTime") \ + .withColumn("ClickTime", to_timestamp(col("CreatedTime"), "yyyy-MM-dd HH:mm:ss")) \ + .drop("CreatedTime") + + join_expr = "ImpressionID == ClickID" + join_type = "inner" + + joined_df = impressions_df.join(clicks_df, expr(join_expr), join_type) + + output_query = joined_df.writeStream \ + .format("console") \ + .outputMode("append") \ + .option("checkpointLocation", "chk-point-dir") \ + .trigger(processingTime="1 minute") \ + .start() + + logger.info("Waiting for Query") + output_query.awaitTermination() diff --git a/12-StreamStreamJoinDemo/data/events.txt b/12-StreamStreamJoinDemo/data/events.txt new file mode 100644 index 0000000..95afbdc --- /dev/null +++ b/12-StreamStreamJoinDemo/data/events.txt @@ -0,0 +1,10 @@ +{"InventoryID": "100001", "CreatedTime": "2020-09-09 10:00:00", "Campaigner": "ABC Ltd"} +{"InventoryID": "100002", "CreatedTime": "2020-09-09 10:06:00", "Campaigner": "ABC Ltd"} +{"InventoryID": "100003", "CreatedTime": "2020-09-09 10:02:00", "Campaigner": "XYZ Ltd"} +{"InventoryID": "100004", "CreatedTime": "2020-09-09 10:09:00", "Campaigner": "XYZ Ltd"} + +{"InventoryID": "100001", "CreatedTime": "2020-09-09 10:18:00"} +{"InventoryID": "100002", "CreatedTime": "2020-09-09 10:18:00"} +{"InventoryID": "100003", "CreatedTime": "2020-09-09 10:18:00"} +{"InventoryID": "100004", "CreatedTime": "2020-09-09 10:18:00"} +{"InventoryID": "100001", "CreatedTime": "2020-09-09 10:18:00"} \ No newline at end of file diff --git a/12-StreamStreamJoinDemo/kafka-scripts/01-start-zookeeper.cmd b/12-StreamStreamJoinDemo/kafka-scripts/01-start-zookeeper.cmd new file mode 100644 index 0000000..17fed5d --- /dev/null +++ b/12-StreamStreamJoinDemo/kafka-scripts/01-start-zookeeper.cmd @@ -0,0 +1 @@ +%KAFKA_HOME%\bin\windows\zookeeper-server-start.bat %KAFKA_HOME%\config\zookeeper.properties \ No newline at end of file diff --git a/12-StreamStreamJoinDemo/kafka-scripts/02-start-kafka.cmd b/12-StreamStreamJoinDemo/kafka-scripts/02-start-kafka.cmd new file mode 100644 index 0000000..ca690d7 --- /dev/null +++ b/12-StreamStreamJoinDemo/kafka-scripts/02-start-kafka.cmd @@ -0,0 +1 @@ +%KAFKA_HOME%\bin\windows\kafka-server-start.bat %KAFKA_HOME%\config\server-0.properties \ No newline at end of file diff --git a/12-StreamStreamJoinDemo/kafka-scripts/03-create-impression-topic.cmd b/12-StreamStreamJoinDemo/kafka-scripts/03-create-impression-topic.cmd new file mode 100644 index 0000000..778e71d --- /dev/null +++ b/12-StreamStreamJoinDemo/kafka-scripts/03-create-impression-topic.cmd @@ -0,0 +1 @@ +%KAFKA_HOME%\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic impressions \ No newline at end of file diff --git a/12-StreamStreamJoinDemo/kafka-scripts/04-create-click-topic.cmd b/12-StreamStreamJoinDemo/kafka-scripts/04-create-click-topic.cmd new file mode 100644 index 0000000..709d8a3 --- /dev/null +++ b/12-StreamStreamJoinDemo/kafka-scripts/04-create-click-topic.cmd @@ -0,0 +1 @@ +%KAFKA_HOME%\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic clicks \ No newline at end of file diff --git a/12-StreamStreamJoinDemo/kafka-scripts/05-start-impression-producer.cmd b/12-StreamStreamJoinDemo/kafka-scripts/05-start-impression-producer.cmd new file mode 100644 index 0000000..1bbdc23 --- /dev/null +++ b/12-StreamStreamJoinDemo/kafka-scripts/05-start-impression-producer.cmd @@ -0,0 +1 @@ +%KAFKA_HOME%\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic impressions \ No newline at end of file diff --git a/12-StreamStreamJoinDemo/kafka-scripts/06-start-click-producer.cmd b/12-StreamStreamJoinDemo/kafka-scripts/06-start-click-producer.cmd new file mode 100644 index 0000000..8ce8eda --- /dev/null +++ b/12-StreamStreamJoinDemo/kafka-scripts/06-start-click-producer.cmd @@ -0,0 +1 @@ +%KAFKA_HOME%\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic clicks \ No newline at end of file diff --git a/12-StreamStreamJoinDemo/lib/__init__.py b/12-StreamStreamJoinDemo/lib/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/12-StreamStreamJoinDemo/lib/logger.py b/12-StreamStreamJoinDemo/lib/logger.py new file mode 100644 index 0000000..d5bc91b --- /dev/null +++ b/12-StreamStreamJoinDemo/lib/logger.py @@ -0,0 +1,21 @@ +class Log4j: + def __init__(self, spark): + log4j = spark._jvm.org.apache.log4j + + root_class = "guru.learningjournal.spark.examples" + conf = spark.sparkContext.getConf() + app_name = conf.get("spark.app.name") + + self.logger = log4j.LogManager.getLogger(root_class + "." + app_name) + + def warn(self, message): + self.logger.warn(message) + + def info(self, message): + self.logger.info(message) + + def error(self, message): + self.logger.error(message) + + def debug(self, message): + self.logger.debug(message) \ No newline at end of file diff --git a/12-StreamStreamJoinDemo/log4j.properties b/12-StreamStreamJoinDemo/log4j.properties new file mode 100644 index 0000000..b910344 --- /dev/null +++ b/12-StreamStreamJoinDemo/log4j.properties @@ -0,0 +1,27 @@ +# Set everything to be logged to the console +log4j.rootCategory=WARN, console + +# define console appender +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.out +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n + +#application log +log4j.logger.guru.learningjournal.spark.examples=INFO, console +log4j.additivity.guru.learningjournal.spark.examples=false + +#define following in Java System +# -Dlog4j.configuration=file:log4j.properties + +# Recommendations from Spark template +log4j.logger.org.apache.spark.repl.Main=WARN +log4j.logger.org.spark_project.jetty=WARN +log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR +log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO +log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO +log4j.logger.org.apache.parquet=ERROR +log4j.logger.parquet=ERROR +log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL +log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR +