|
| 1 | +from pyspark.sql import SparkSession |
| 2 | +from pyspark.sql.functions import from_json, col, to_timestamp |
| 3 | +from pyspark.sql.types import StructType, StructField, StringType |
| 4 | + |
| 5 | +from lib.logger import Log4j |
| 6 | + |
| 7 | + |
| 8 | +def write_to_cassandra(target_df, batch_id): |
| 9 | + target_df.write \ |
| 10 | + .format("org.apache.spark.sql.cassandra") \ |
| 11 | + .option("keyspace", "spark_db") \ |
| 12 | + .option("table", "users") \ |
| 13 | + .mode("append") \ |
| 14 | + .save() |
| 15 | + target_df.show() |
| 16 | + |
| 17 | + |
| 18 | +if __name__ == "__main__": |
| 19 | + spark = SparkSession \ |
| 20 | + .builder \ |
| 21 | + .master("local[3]") \ |
| 22 | + .appName("Stream Table Join Demo") \ |
| 23 | + .config("spark.streaming.stopGracefullyOnShutdown", "true") \ |
| 24 | + .config("spark.sql.shuffle.partitions", 2) \ |
| 25 | + .config("spark.cassandra.connection.host", "localhost") \ |
| 26 | + .config("spark.cassandra.connection.port", "9042") \ |
| 27 | + .config("spark.sql.extensions", "com.datastax.spark.connector.CassandraSparkExtensions") \ |
| 28 | + .config("spark.sql.catalog.lh", "com.datastax.spark.connector.datasource.CassandraCatalog") \ |
| 29 | + .getOrCreate() |
| 30 | + |
| 31 | + logger = Log4j(spark) |
| 32 | + |
| 33 | + login_schema = StructType([ |
| 34 | + StructField("created_time", StringType()), |
| 35 | + StructField("login_id", StringType()) |
| 36 | + ]) |
| 37 | + |
| 38 | + kafka_source_df = spark \ |
| 39 | + .readStream \ |
| 40 | + .format("kafka") \ |
| 41 | + .option("kafka.bootstrap.servers", "localhost:9092") \ |
| 42 | + .option("subscribe", "logins") \ |
| 43 | + .option("startingOffsets", "earliest") \ |
| 44 | + .load() |
| 45 | + |
| 46 | + value_df = kafka_source_df.select(from_json(col("value").cast("string"), login_schema).alias("value")) |
| 47 | + |
| 48 | + login_df = value_df.select("value.*") \ |
| 49 | + .withColumn("created_time", to_timestamp(col("created_time"), "yyyy-MM-dd HH:mm:ss")) |
| 50 | + |
| 51 | + user_df = spark.read \ |
| 52 | + .format("org.apache.spark.sql.cassandra") \ |
| 53 | + .option("keyspace", "spark_db") \ |
| 54 | + .option("table", "users") \ |
| 55 | + .load() |
| 56 | + |
| 57 | + join_expr = login_df.login_id == user_df.login_id |
| 58 | + join_type = "inner" |
| 59 | + |
| 60 | + joined_df = login_df.join(user_df, join_expr, join_type) \ |
| 61 | + .drop(login_df.login_id) |
| 62 | + |
| 63 | + output_df = joined_df.select(col("login_id"), col("user_name"), |
| 64 | + col("created_time").alias("last_login")) |
| 65 | + |
| 66 | + output_query = output_df.writeStream \ |
| 67 | + .foreachBatch(write_to_cassandra) \ |
| 68 | + .outputMode("update") \ |
| 69 | + .option("checkpointLocation", "chk-point-dir") \ |
| 70 | + .trigger(processingTime="1 minute") \ |
| 71 | + .start() |
| 72 | + |
| 73 | + logger.info("Waiting for Query") |
| 74 | + output_query.awaitTermination() |
0 commit comments