-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdata_stream.py
134 lines (110 loc) · 4.65 KB
/
data_stream.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import logging
import json
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as psf
# TODO Create a schema for incoming resources
"""
From the StructField documentation:
StructField(name, dataType, nullable)
:param name: the name of this field.
:param dataType: the data type of this field.
:param nullable: indicates whether values of this field can be null.
"""
schema = StructType(
[
StructField("crime_id", StringType(), True),
StructField("original_crime_type_name", StringType(), True),
StructField("report_date", StringType(), True),
StructField("call_date", StringType(), True),
StructField("offense_date", StringType(), True),
StructField("call_time", StringType(), True),
StructField("call_date_time", StringType(), True),
StructField("disposition", StringType(), True),
StructField("address", StringType(), True),
StructField("city", StringType(), True),
StructField("state", StringType(), True),
StructField("agency_id", StringType(), True),
StructField("address_type", StringType(), True),
StructField("common_location", StringType(), True)
]
)
def run_spark_job(spark):
# TODO Create Spark Configuration
# Create Spark configurations with max offset of 200 per trigger
# set up correct bootstrap server and port
logger.debug("Creating Spark Configuration..")
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "calls") \
.option("startingOffsets", "earliest") \
.option("maxRatePerPartition", 100) \
.option("maxOffsetPerTrigger", 200) \
.load()
# Show schema for the incoming resources for checks
df.printSchema()
# TODO extract the correct column from the kafka input resources
# Take only value and convert it to String
logger.debug("Extracting column of interest..")
kafka_df = df.selectExpr("CAST(value AS STRING)")
service_table = kafka_df \
.select(psf.from_json(psf.col('value'), schema).alias("SERVICE")) \
.select("SERVICE.*")
# TODO select original_crime_type_name and disposition
logger.debug("Selecting original_crime_type_name and disposition..")
distinct_table = service_table.select(
psf.to_timestamp(psf.col("call_date_time")).alias("call_date_time"),
psf.col("original_crime_type_name"),
psf.col("disposition")
)
distinct_table.printSchema()
# count the number of original crime type
logger.debug("Counting the number of crimes..")
agg_df = distinct_table \
.select(
distinct_table.call_date_time,
distinct_table.original_crime_type_name,
distinct_table.disposition
) \
.withWatermark("call_date_time", "60 minutes") \
.groupBy(
psf.window(distinct_table.call_date_time, "10 minutes"),
psf.col("original_crime_type_name")
) \
.count()
# TODO Q1. Submit a screen shot of a batch ingestion of the aggregation
# TODO write output stream
print("Writing output stream..")
query = agg_df \
.writeStream \
.format("console") \
.outputMode("complete") \
.start()
# TODO attach a ProgressReporter
query.awaitTermination()
# TODO get the right radio code json path
radio_code_json_filepath = "radio_code.json"
radio_code_df = spark.read.json(radio_code_json_filepath)
# clean up your data so that the column names match on radio_code_df and agg_df
# we will want to join on the disposition code
# TODO rename disposition_code column to disposition
radio_code_df = radio_code_df.withColumnRenamed("disposition_code", "disposition").collect()
# TODO join on disposition column
join_query = agg_df.join(radio_code_df,
col("agg_df.disposition") == col("radio_code_df.disposition"),
"left_outer")
join_query.awaitTermination()
if __name__ == "__main__":
logger = logging.getLogger(__name__)
# TODO Create Spark in Standalone mode
spark = SparkSession \
.builder \
.config("spark.ui.port", 3000) \
.master("local[*]") \
.appName("KafkaSparkStructuredStreaming") \
.getOrCreate()
logger.info("Spark started")
run_spark_job(spark)
spark.stop()