Skip to content

khalidmammadov/polarspark

Folders and files

NameName
Last commit message
Last commit date

Latest commit

ย 

History

159 Commits
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 

Repository files navigation

CI PyPI Python License

 ____       _              ____                   _    
|  _ \ ___ | | __ _ _ __  / ___| _ __   __ _ _ __| | __
| |_) / _ \| |/ _` | '__| \___ \| '_ \ / _` | '__| |/ /
|  __/ (_) | | (_| | |     ___) | |_) | (_| | |  |   < 
|_|   \___/|_|\__,_|_|    |____/| .__/ \__,_|_|  |_|\_\
                                |_|                    

๐Ÿš€ Apache Spark on Polars

Polar Spark brings the PySpark API to Polars, optimized for single-machine workloads.

It is designed as a drop-in replacement for PySpark in scenarios where a full Spark cluster is not needed. A common use case is running fast, lightweight unit tests in CI/CD pipelines ๐Ÿงช.

Instead of relying on the JVM-based Spark engine, Polar Spark runs on Polarsโ€™ Lazy API, powered by a high-performance Rust execution engine ๐Ÿฆ€. This avoids the overhead of the JVM, which can be slow and heavy for small or local workloads.

By leveraging Polars, Polar Spark automatically benefits from:

  • ๐Ÿš€ Advanced query optimization
  • ๐Ÿงต Efficient multithreading
  • ๐Ÿ–ฅ๏ธ Excellent performance on modern CPUs

๐ŸŽฏ Goal: Make Polar Spark a seamless PySpark replacement whenever workloads fit on a single machine or within local resource limits.

Installation

pip install polarspark==0.2.2a4

Examples:

Spark session

try:            
    from polarspark.sql.session import SparkSession
except Exception:
    from pyspark.sql.session import SparkSession

spark = SparkSession.builder.master("local").appName("myapp").getOrCreate()

print(spark)
print(type(spark))

>>> <polarspark.sql.session.SparkSession object at 0x1043bdd90>
>>> <class 'polarspark.sql.session.SparkSession'>

DataFrame API

try:
    from polarspark.sql import Row
    from polarspark.sql.types import *
except Exception:
    from pyspark.sql import Row
    from pyspark.sql.types import *    
from pprint import pprint
d = [{'name': 'Alice', 'age': 1}, 
     {'name': 'Tome', 'age': 100}, 
     {'name': 'Sim', 'age': 99}]
df = spark.createDataFrame(d)
rows = df.collect()

SQL

spark.sql("CREATE TABLE input_table (value string) USING parquet")
spark.sql("INSERT INTO input_table VALUES (1), (2), (3)")

spark.sql("""
    SELECT * 
    FROM input_table i 
        JOIN my_table m 
    ON i.value = m.age
""").show()

API

pprint(rows)
>>> [Row(age=1, name='Alice'),
>>>  Row(age=100, name='Tome'),
>>>  Row(age=99, name='Sim')]
df.printSchema()
>>> root
>>>  |-- age: long (nullable = true)
>>>  |-- name: string (nullable = true)
# With schema
schema = StructType([
            StructField("name", StringType(), True),
            StructField("age", IntegerType(), True)])
df_no_rows = spark.createDataFrame([], schema=schema)

print(df_no_rows.isEmpty())
>>> True
# or using Spark DDL
df = spark.createDataFrame([("Alice", 3), ("Ben", 5)], schema="name STRING, age INT")
print(df.isEmpty())
>>> False

Read / write Parquet, Delta, CSV etc.

base_path = "/var/tmp"

df1 = spark.read.format("json").load([f"{base_path}/data.json",
                                     f"{base_path}/data.json"
                                     ])
df2 = spark.read.json([f"{base_path}/data.json",
                      f"{base_path}/data.json"])


df1.write.format("csv").save(f"{base_path}/data_json_to_csv.csv", mode="overwrite")

df1 = spark.read.format("csv").load([f"{base_path}/data_json_to_csv.csv",
                                       f"{base_path}/data_json_to_csv.csv"])

df1 = spark.read.format("parquet").load([f"{base_path}/data_json_to_parquet.parquet",
                                       f"{base_path}/data_json_to_parquet.parquet"])
df2 = spark.read.parquet(f"{base_path}/data_json_to_parquet.parquet",
                               f"{base_path}/data_json_to_parquet.parquet")

Streaming (Stateless)

df = self.spark.readStream.format("rate").load()
q = df.writeStream.toTable("output_table", format="parquet", checkpointLocation=tmpdir)
q.stop()
result = self.spark.sql("SELECT value FROM output_table").collect()    

Streaming (foreachBatch)

def collectBatch(batch_df, batch_id):
    batch_df.write.format("parquet").mode("overwrite").saveAsTable("test_table1")

df = self.spark.readStream.format("text").load("polarspark/test_support/sql/streaming")
q = df.writeStream.foreachBatch(collectBatch).start()
q.processAllAvailable()
collected = self.spark.sql("select * from test_table1").collect()

In Memory Catalog

df.write.saveAsTable("my_table")
spark.sql("select * from my_table").show()

Some more:

Filter

pprint(df.offset(1).first())
>>>  Row(age=100, name='Tome')
df.show()

shape: (3, 2)
โ”Œโ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ age โ”† name     โ”‚
โ”‚ --- โ”† ---      โ”‚
โ”‚ i64 โ”† str      โ”‚
โ•žโ•โ•โ•โ•โ•โ•ชโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•ก
โ”‚ 1   โ”† Alice    โ”‚
โ”‚ 100 โ”† Tome     โ”‚
โ”‚ 99  โ”† Sim      โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
df.explain()
                 0
   โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
   โ”‚
   โ”‚  โ•ญโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฎ
   โ”‚  โ”‚ DF ["age", "name"]  โ”‚
 0 โ”‚  โ”‚ PROJECT */2 COLUMNS โ”‚
   โ”‚  โ•ฐโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฏ
print(repr(df))
>>>  DataFrame[age: bigint, name: string]
print(df.count())
>>>  3
def func(row):
    print("Row -> {}".format(row))

df.foreach(func)

df = spark.createDataFrame(
    [(14, "Tom"), (23, "Alice"), (16, "Bob"), (16, "Bob")], ["age", "name"]
)

def func(itr):
    for person in itr:
        print(person)
        print("Person -> {}".format(person.name))
df.foreachPartition(func)

df.show()
df.distinct().show()

NOTE: Some of the features are not directly mapped but relies on Polars. e.g. df.show() or df.explain() will print polars relevant method output

About

Apache Spark on Polars

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Contributors 2

  •  
  •  

Languages