Skip to content

Commit

Permalink
Spark Steaming
Browse files Browse the repository at this point in the history
  • Loading branch information
trajanov committed Oct 10, 2024
1 parent c9c18e0 commit ce8d257
Show file tree
Hide file tree
Showing 4 changed files with 1,633 additions and 0 deletions.
141 changes: 141 additions & 0 deletions Notebooks/Spark-Example-Steaming/gdelt-stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
# GDELT https://www.gdeltproject.org/ is a global event database that extracts events from news articles around the world
# The dataset is updated every 15 minutes and is available in CSV format
# The dataset contains information about the events such as the date, actors involved, location, and more
# The dataset is available at http://data.gdeltproject.org/gdeltv2/lastupdate.txt

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, desc, count
from pyspark.sql.types import StructType, StructField, StringType, TimestampType

# Initialize Spark session

spark = (
SparkSession
.builder
.appName("GDELT CSV Streaming Processing")
.config("spark.sql.adaptive.enabled", "false") \
.config("spark.sql.shuffle.partitions", 8)
.master("local[*]")
.getOrCreate()
)

# Define schema for GDELT CSV files
schema = StructType([
StructField("GLOBALEVENTID", StringType(), True),
StructField("SQLDATE", StringType(), True),
StructField("MonthYear", StringType(), True),
StructField("Year", StringType(), True),
StructField("FractionDate", StringType(), True),
StructField("Actor1Code", StringType(), True),
StructField("Actor1Name", StringType(), True),
StructField("Actor1CountryCode", StringType(), True),
StructField("Actor1KnownGroupCode", StringType(), True),
StructField("Actor1EthnicCode", StringType(), True),
StructField("Actor1Religion1Code", StringType(), True),
StructField("Actor1Religion2Code", StringType(), True),
StructField("Actor1Type1Code", StringType(), True),
StructField("Actor1Type2Code", StringType(), True),
StructField("Actor1Type3Code", StringType(), True),
StructField("Actor2Code", StringType(), True),
StructField("Actor2Name", StringType(), True),
StructField("Actor2CountryCode", StringType(), True),
StructField("Actor2KnownGroupCode", StringType(), True),
StructField("Actor2EthnicCode", StringType(), True),
StructField("Actor2Religion1Code", StringType(), True),
StructField("Actor2Religion2Code", StringType(), True),
StructField("Actor2Type1Code", StringType(), True),
StructField("Actor2Type2Code", StringType(), True),
StructField("Actor2Type3Code", StringType(), True),
StructField("IsRootEvent", StringType(), True),
StructField("EventCode", StringType(), True),
StructField("EventBaseCode", StringType(), True),
StructField("EventRootCode", StringType(), True),
StructField("QuadClass", StringType(), True),
StructField("GoldsteinScale", StringType(), True),
StructField("NumMentions", StringType(), True),
StructField("NumSources", StringType(), True),
StructField("NumArticles", StringType(), True),
StructField("AvgTone", StringType(), True),
StructField("Actor1Geo_Type", StringType(), True),
StructField("Actor1Geo_FullName", StringType(), True),
StructField("Actor1Geo_CountryCode", StringType(), True),
StructField("Actor1Geo_ADM1Code", StringType(), True),
StructField("Actor1Geo_Lat", StringType(), True),
StructField("Actor1Geo_Long", StringType(), True),
StructField("Actor1Geo_FeatureID", StringType(), True),
StructField("Actor2Geo_Type", StringType(), True),
StructField("Actor2Geo_FullName", StringType(), True),
StructField("Actor2Geo_CountryCode", StringType(), True),
StructField("Actor2Geo_ADM1Code", StringType(), True),
StructField("Actor2Geo_Lat", StringType(), True),
StructField("Actor2Geo_Long", StringType(), True),
StructField("Actor2Geo_FeatureID", StringType(), True),
StructField("ActionGeo_Type", StringType(), True),
StructField("ActionGeo_FullName", StringType(), True),
StructField("ActionGeo_CountryCode", StringType(), True),
StructField("ActionGeo_ADM1Code", StringType(), True),
StructField("ActionGeo_Lat", StringType(), True),
StructField("ActionGeo_Long", StringType(), True),
StructField("ActionGeo_FeatureID", StringType(), True),
StructField("DATEADDED", StringType(), True),
StructField("SOURCEURL", StringType(), True)
])

# Define the source of the streaming data
input_path = "input_files" # Path to GDELT CSV data file

# Read streaming data from CSV files
df = spark.readStream \
.option("header", "false") \
.option("delimiter", "\t") \
.schema(schema) \
.csv(input_path)

# Convert GoldsteinScale to float
df = df.withColumn("GoldsteinScale", col("GoldsteinScale").cast("float"))

# Load country code mapping file
country_mapping_path = "country_mapping.csv" # Path to country mapping CSV file
country_schema = StructType([
StructField("CountryCode", StringType(), True),
StructField("CountryName", StringType(), True)
])

country_mapping_df = spark.read \
.option("header", "true") \
.schema(country_schema) \
.csv(country_mapping_path)

# Join the GDELT data with the country mapping to get country names
joined_df = df.join(country_mapping_df, df.Actor1CountryCode == country_mapping_df.CountryCode, "left")

# Group by country name and calculate the average of GoldsteinScale and count of events
country_goldstein_df = joined_df.groupBy("CountryName") \
.agg(
avg("GoldsteinScale").alias("AverageGoldsteinScale"),
count("GLOBALEVENTID").alias("NumberOfEvents")
)
# add sum of NumMentions
country_goldstein_df = country_goldstein_df.withColumn("SumNumMentions", col("NumberOfEvents")*col("NumMentions"))

# Get the top 10 most positive and 10 most negative countries
most_positive_countries = country_goldstein_df.orderBy(desc("AverageGoldsteinScale")).limit(10)
most_negative_countries = country_goldstein_df.orderBy("AverageGoldsteinScale").limit(10)


# Write the results to the console in real-time
query_positive = most_positive_countries.writeStream \
.outputMode("complete") \
.format("console") \
.option("truncate", "false") \
.start()

query_negative = most_negative_countries.writeStream \
.outputMode("complete") \
.format("console") \
.option("truncate", "false") \
.start()
#.option("checkpointLocation", "checkpoint") \

query_positive.awaitTermination()
query_negative.awaitTermination()
71 changes: 71 additions & 0 deletions Notebooks/Spark-Example-Steaming/gdelt_update.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# GDELT https://www.gdeltproject.org/ is a global event database that extracts events from news articles around the world
# The dataset is updated every 15 minutes and is available in CSV format
# The dataset contains information about the events such as the date, actors involved, location, and more
# The dataset is available at http://data.gdeltproject.org/gdeltv2/lastupdate.txt

import requests
import os
import re
import time
import zipfile
from urllib.parse import urljoin

# Define URL for GDELT last update file
lastupdate_url = "http://data.gdeltproject.org/gdeltv2/lastupdate.txt"

# Define the input path where the CSV file should be saved
input_path = "input_files"

# Function to check and download the new export.csv file
def check_and_download_file():
try:
# Fetch the last update information
response = requests.get(lastupdate_url)
response.raise_for_status()
last_update_content = response.text.strip()

# Use regex to find the URL of the export CSV file
match = re.search(r'\bhttp://data\.gdeltproject\.org/gdeltv2/\d+\.export\.CSV\.zip\b', last_update_content)
if match:
csv_url = match.group(0)

# Extract file name from URL
file_name = csv_url.split("/")[-1]
dest_path = os.path.join(input_path, file_name)

# Check if the file already exists
if not os.path.exists(dest_path):
download_file(csv_url, dest_path)
unzip_file(dest_path, input_path)
else:
print("File already exists at", dest_path)
except requests.exceptions.RequestException as e:
print("Error fetching last update info:", e)

# Function to download the file from the given URL
def download_file(url, dest_path):
try:
response = requests.get(url, stream=True)
response.raise_for_status()

# Download the file and save it to the destination path
with open(dest_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=1024):
f.write(chunk)
print("Downloaded file to", dest_path)
except requests.exceptions.RequestException as e:
print("Error downloading the file:", e)

# Function to unzip the downloaded file
def unzip_file(zip_path, extract_to):
try:
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(extract_to)
print("Unzipped file to", extract_to)
except zipfile.BadZipFile as e:
print("Error unzipping the file:", e)

if __name__ == "__main__":
while True:
check_and_download_file()
time.sleep(300) # Wait for 5 minutes before checking again
Loading

0 comments on commit ce8d257

Please sign in to comment.