Qbeast Spark is an extension for Data Lakehouses that enables multi-dimensional filtering and sampling directly on the storage
-
Data Lakehouse - Data lake with ACID properties, thanks to the underlying Delta Lake architecture
-
Multi-column indexing: Filter your data with multiple columns using the Qbeast Format.
-
Improved Sampling operator - Read statistically significant subsets of files.
-
Table Tolerance - Model for sampling fraction and query accuracy trade-off.
As you can see above, the Qbeast Spark extension allows faster queries with statistically accurate sampling.
Format | Execution Time | Result |
---|---|---|
Delta | ~ 2.5 min. | 37.869383 |
Qbeast | ~ 6.6 sec. | 37.856333 |
In this example, 1% sampling provides the result x22 times faster compared to using Delta format, with an error of 0,034%.
This project is in an early development phase: there are missing functionalities and the API might change drastically.
Join ⨝ the community to be a part of this project!
See Issues tab to know what is cooking 😎
You can run the qbeast-spark application locally on your computer, or using a Docker image we already prepared with the dependencies. You can find it in the Packages section.
Download Spark 3.1.1 with Hadoop 3.2, unzip it, and create the SPARK_HOME
environment variable:
ℹ️ Note: You can use Hadoop 2.7 if desired, but you could have some troubles with different cloud providers' storage, read more about it here.
wget https://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
tar xzvf spark-3.1.1-bin-hadoop3.2.tgz
export SPARK_HOME=$PWD/spark-3.1.1-bin-hadoop3.2
Inside the project folder, launch a spark shell with the required dependencies:
$SPARK_HOME/bin/spark-shell \
--conf spark.sql.extensions=io.qbeast.spark.internal.QbeastSparkSessionExtension \
--packages io.qbeast:qbeast-spark_2.12:0.2.0,io.delta:delta-core_2.12:1.0.0
Read the CSV source file placed inside the project.
val csv_df = spark.read.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("./src/test/resources/ecommerce100K_2019_Oct.csv")
Indexing the dataset by writing it into the qbeast format, specifying the columns to index.
val tmp_dir = "/tmp/qbeast-spark"
csv_df.write
.mode("overwrite")
.format("qbeast")
.option("columnsToIndex", "user_id,product_id")
.save(tmp_dir)
Load the newly indexed dataset.
val qbeast_df =
spark
.read
.format("qbeast")
.load(tmp_dir)
Sampling the data, notice how the sampler is converted into filters and pushed down to the source!
qbeast_df.sample(0.1).explain(true)
Go to the Quickstart or notebook for more details.
Get insights or execute operations to the data using the QbeastTable
interface!
import io.qbeast.spark.QbeastTable
val qbeast_table = QbeastTable.forPath(spark, tmp_dir)
qbeastTable.getIndexMetrics()
qbeastTable.analyze()
The format supports Spark SQL syntax. It also updates the index in a dynamic fashion when new data is inserted.
val newData = Seq(1, 2, 3, 4).toDF("value")
newData.createOrReplaceTempView("newTable")
spark.sql("insert into table myTable select * from newTable")
spark.sql("insert into table myTable (value) values (4)")
Go to QbeastTable documentation for more detailed information.
Version | Spark | Hadoop | Delta Lake |
---|---|---|---|
0.1.0 | 3.0.0 | 3.2.0 | 0.8.0 |
0.2.0 | 3.1.x | 3.2.0 | 1.0.0 |
Check here for Delta Lake and Apache Spark version compatibility.
See Contribution Guide for more information.
See LICENSE.
See Code of conduct