Skip to content

Commit 0143aaf

Browse files
committed
Initial commit.
1 parent c7b4fa7 commit 0143aaf

File tree

7 files changed

+194
-1
lines changed

7 files changed

+194
-1
lines changed

.gitignore

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,4 +157,6 @@ cython_debug/
157157
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
158158
# and can be added to the global gitignore or merged into this file. For a more nuclear
159159
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
160-
#.idea/
160+
.idea/
161+
162+
parquet/

README.md

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
Offline Feature Extraction with Spark and Regular Expressions
2+
----
3+
4+
This project demonstrates the scalability of Spark using the Wikipedia dataset and it's ability to perform off-line feature extract. This allows the user of the scripts to find articles that are related to the supplied RegExs for use in training machine learning models.
5+
6+
The dataset can be found here:
7+
https://huggingface.co/datasets/wikimedia/wikipedia
8+
9+
Three scripts are provided:
10+
11+
1. `download_wikipedia.py` - This script downloads the English Wikipedia dataset as a single file.
12+
2. `repartition.py` - This script uses Spark to repartition the data into convenient ~512 MB Parquet files w/Snappy compression.
13+
3. `scan.py` - This script uses Spark to add a regular expression column to the data set, filter out rows the lack any matches and saves the data to disk.
14+
15+
Note:
16+
17+
The configuration of the driver/executors are hardcoded into the repartition and scan scripts. This should likely be driven by a configuration file instead.
18+
19+
Future Enhancements:
20+
21+
1. Remove unnecessary columns to save on storage space.
22+
2. Replace UDF with a flatMap to reduce time associated with `filter` step.
23+
3. Source a larger dataset and benchmark the performance of the scripts.
24+
4. Load driver/executor settings from a configuration file instead of hard-coding.

download_wikipedia.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
from datasets import load_dataset
2+
import pandas as pd
3+
4+
# Load the dataset, note this data set is quite large so make sure you have enough free disk space
5+
ds = load_dataset("wikimedia/wikipedia", "20231101.en")
6+
7+
# Convert the dataset to a Pandas DataFrame
8+
df = pd.DataFrame(ds['train'])
9+
10+
# Save the DataFrame as a Parquet file
11+
df.to_parquet('parquet/input/wikipedia_20231101.parquet', engine='fastparquet')

repartition.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
from pyspark.sql import SparkSession
2+
from pyspark import StorageLevel
3+
4+
import math
5+
6+
# Initialize Spark session
7+
spark = SparkSession.builder \
8+
.master('spark://processing:7077') \
9+
.config("spark.driver.memory", "16g") \
10+
.config("spark.driver.cores", "1") \
11+
.config("spark.executor.memory", "32g") \
12+
.config("spark.executor.instances", "1") \
13+
.config("spark.executor.cores", "1") \
14+
.appName('RepartitionFile') \
15+
.getOrCreate()
16+
17+
# File Size for repartition targets and Input/Output file paths
18+
file_size_gb = 11 # This is the approximate size of the input parquet files, should probably be loaded from disk
19+
file_size_mb = file_size_gb * 1024
20+
target_partition_size_mb = 512
21+
22+
input_path = "parquet/input/wikipedia_20231101.parquet"
23+
output_path = "parquet/input/repartitioned_data"
24+
25+
# Load the input parquet file
26+
df = spark.read.parquet(input_path)
27+
df.persist(StorageLevel.MEMORY_AND_DISK)
28+
29+
# Calculate the number of partitions
30+
num_partitions = math.ceil(file_size_mb / target_partition_size_mb)
31+
32+
# Repartition the dataframe, note this doesn't happen until the write on a subsequent line as dataframes in Spark are lazy
33+
repartitioned_df = df.repartition(num_partitions)
34+
35+
# Write the repartitioned dataframe to parquet files
36+
repartitioned_df.write.mode("overwrite").parquet(output_path)
37+
38+
# Stop the Spark session, this frees resources in the Spark cluster and clearly marks the end of the job
39+
spark.stop()

requirements.txt

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
# PySpark
2+
pyspark
3+
4+
# ML
5+
datasets
6+
pandas
7+
8+
# Parquet
9+
fastparquet # used by the download wikipedia script to save the dataset
10+
parquet-tools # this is used for CLI reading/inspection of parquet files
11+

scan.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
from pyspark.sql import SparkSession
2+
from pyspark.sql.functions import col, udf, size
3+
from pyspark.sql.types import StringType, ArrayType
4+
from pyspark import StorageLevel
5+
from utils import regex_patterns
6+
7+
import re
8+
9+
spark = SparkSession.builder \
10+
.master('spark://processing:7077') \
11+
.config("spark.driver.memory", "12g") \
12+
.config("spark.driver.cores", "1") \
13+
.config("spark.executor.memory", "4g") \
14+
.config("spark.executor.instances", "4") \
15+
.config("spark.executor.cores", "1") \
16+
.appName('Scan') \
17+
.getOrCreate()
18+
19+
# Compile the combined regex patterns, this significantly speeds up the process and (?i) makes it case insensitive
20+
compiled_patterns = {category: re.compile("(?i)" + "|".join(patterns)) for category, patterns in regex_patterns.items()}
21+
22+
# Define UDF to classify text based on compiled regex patterns
23+
def classify_text(text):
24+
matched_categories = []
25+
for category, pattern in compiled_patterns.items():
26+
if pattern.search(text):
27+
matched_categories.append(category)
28+
return matched_categories if matched_categories else None
29+
30+
classify_text_udf = udf(classify_text, ArrayType(StringType()))
31+
32+
# Load the input parquet file
33+
input_df = spark.read.parquet("parquet/input/repartitioned_data/*.parquet")
34+
35+
# Ensure that in limited memory environments the job can run successfully
36+
input_df.persist(StorageLevel.MEMORY_AND_DISK)
37+
38+
# Apply the classification UDF to the text column
39+
output_df = input_df.withColumn("categories", classify_text_udf(col("text")))
40+
41+
# Filter out rows with no categories
42+
filtered_df = output_df.filter(col("categories").isNotNull() & (size(col("categories")) > 0))
43+
44+
# Select the required columns
45+
result_df = filtered_df.select("title", "text", "categories")
46+
47+
# Save the result to a parquet file
48+
result_df.write.mode("overwrite").parquet("parquet/output/results")
49+
50+
# Stop the Spark session
51+
spark.stop()

utils.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
# RegEx Patterns generated by ChatGPT for electronic music
2+
regex_patterns = {
3+
"house": [
4+
"\\bdeep\\s+house\\b",
5+
"\\btech\\s+house\\b",
6+
"\\bprogressive\\s+house\\b",
7+
"\\btropical\\s+house\\b",
8+
"\\bfuture\\s+house\\b",
9+
"\\bacid\\s+house\\b",
10+
"\\belectro\\s+house\\b"
11+
],
12+
"techno": [
13+
"\\bminimal\\s+techno\\b",
14+
"\\bdetroit\\s+techno\\b",
15+
"\\bacid\\s+techno\\b",
16+
"\\bindustrial\\s+techno\\b",
17+
"\\bhard\\s+techno\\b",
18+
"\\btech\\s+techno\\b"
19+
],
20+
"trance": [
21+
"\\bprogressive\\s+trance\\b",
22+
"\\buplifting\\s+trance\\b",
23+
"\\bpsychedelic\\s+trance\\b",
24+
"\\bgoa\\s+trance\\b",
25+
"\\btech\\s+trance\\b",
26+
"\\bvocal\\s+trance\\b"
27+
],
28+
"dubstep": [
29+
"\\bbrostep\\b",
30+
"\\bchillstep\\b",
31+
"\\briddim\\b",
32+
"\\bdeep\\s+dubstep\\b"
33+
],
34+
"drum_and_bass": [
35+
"\\bdnb\\b",
36+
"\\bneurofunk\\b",
37+
"\\bliquid\\s+funk\\b",
38+
"\\bjump\\s+up\\b",
39+
"\\bdarkstep\\b",
40+
"\\bbreakcore\\b"
41+
],
42+
"electro": [
43+
"\\belectro\\b",
44+
"\\belectro\\s+funk\\b",
45+
"\\belectro\\s+clash\\b",
46+
"\\bfuture\\s+electro\\b"
47+
],
48+
"hardcore": [
49+
"\\bhardcore\\b",
50+
"\\bgabber\\b",
51+
"\\bhappy\\s+hardcore\\b",
52+
"\\bdigital\\s+hardcore\\b",
53+
"\\bbreakbeat\\s+hardcore\\b"
54+
]
55+
}

0 commit comments

Comments
 (0)