Qbeast Spark is an extension for Data Lakehouses that enables multi-dimensional filtering and sampling directly on the storage
-
Data Lakehouse - Data lake with ACID properties, thanks to the underlying Delta Lake architecture
-
Multi-column indexing: Filter your data with multiple columns using the Qbeast Format.
-
Improved Sampling operator - Read statistically significant subsets of files.
-
Table Tolerance - Model for sampling fraction and query accuracy trade-off.
As you can see above, the Qbeast Spark extension allows faster queries with statistically accurate sampling.
Format | Execution Time | Result |
---|---|---|
Delta | ~ 151.3 sec. | 37.869383 |
Qbeast | ~ 6.6 sec. | 37.856333 |
In this example, 1% sampling provides the result x22 times faster compared to using Delta format, with an error of 0,034%.
You can run the qbeast-spark application locally on your computer, or using a Docker image we already prepared with the dependencies. You can find it in the Packages section.
Download Spark 3.4.1 with Hadoop 3.3.4, unzip it, and create the SPARK_HOME
environment variable:
ℹ️ Note: You can use Hadoop 2.7 if desired, but you could have some troubles with different cloud providers' storage, read more about it here.
wget https://archive.apache.org/dist/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3.tgz
tar -xzvf spark-3.4.1-bin-hadoop3.tgz
export SPARK_HOME=$PWD/spark-3.4.1-bin-hadoop3
Inside the project folder, launch a spark shell with the required dependencies:
$SPARK_HOME/bin/spark-shell \
--packages io.qbeast:qbeast-spark_2.12:0.5.0,io.delta:delta-core_2.12:2.4.0 \
--conf spark.sql.extensions=io.qbeast.spark.internal.QbeastSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=io.qbeast.spark.internal.sources.catalog.QbeastCatalog
Read the CSV source file placed inside the project.
val csvDF = spark.read.format("csv").
option("header", "true").
option("inferSchema", "true").
load("./src/test/resources/ecommerce100K_2019_Oct.csv")
Indexing the dataset by writing it into the qbeast format, specifying the columns to index.
val tmpDir = "/tmp/qbeast-spark"
csvDF.write.
mode("overwrite").
format("qbeast").
option("columnsToIndex", "user_id,product_id").
save(tmpDir)
You can create a table with Qbeast with the help of QbeastCatalog
.
spark.sql(
"CREATE TABLE student (id INT, name STRING, age INT) " +
"USING qbeast OPTIONS ('columnsToIndex'='id')")
Use INSERT INTO
to add records to the new table. It will update the index in a dynamic fashion when new data is inserted.
spark.sql("INSERT INTO table student SELECT * FROM visitor_students")
Load the newly indexed dataset.
val qbeastDF =
spark.
read.
format("qbeast").
load(tmpDir)
Sampling the data, notice how the sampler is converted into filters and pushed down to the source!
qbeastDF.sample(0.1).explain(true)
Go to the Quickstart or notebook for more details.
Get insights or execute operations to the data using the QbeastTable
interface!
import io.qbeast.spark.QbeastTable
val qbeastTable = QbeastTable.forPath(spark, tmpDir)
qbeastTable.getIndexMetrics()
qbeastTable.analyze()
Go to QbeastTable documentation for more detailed information.
Use Python index visualizer for your indexed table to visually examine index structure and gather sampling metrics.
Version | Spark | Hadoop | Delta Lake |
---|---|---|---|
0.1.0 | 3.0.0 | 3.2.0 | 0.8.0 |
0.2.0 | 3.1.x | 3.2.0 | 1.0.0 |
0.3.x | 3.2.x | 3.3.x | 1.2.x |
0.4.x | 3.3.x | 3.3.x | 2.1.x |
0.5.x | 3.4.x | 3.3.x | 2.4.x |
Check here for Delta Lake and Apache Spark version compatibility.
See Contribution Guide for more information.
See LICENSE.
See Code of conduct