Qbeast Spark is an extension for Data Lakehouses that enables multi-dimensional filtering and sampling directly on the storage
-
Data Lakehouse - Data lake with ACID properties, thanks to the underlying Delta Lake architecture
-
Multi-column indexing: Filter your data with multiple columns using the Qbeast Format.
-
Improved Sampling operator - Read statistically significant subsets of files.
-
Table Tolerance - Model for sampling fraction and query accuracy trade-off.
As you can see above, the Qbeast Spark extension allows faster queries with statistically accurate sampling.
Format | Execution Time | Result |
---|---|---|
Delta | ~ 151.3 sec. | 37.869383 |
Qbeast | ~ 6.6 sec. | 37.856333 |
In this example, 1% sampling provides the result x22 times faster compared to using Delta format, with an error of 0,034%.
You can run the qbeast-spark application locally on your computer, or using a Docker image we already prepared with the dependencies. You can find it in the Packages section.
Download Spark 3.4.1 with Hadoop 3.3.4, unzip it, and create the SPARK_HOME
environment variable:
ℹ️ Note: You can use Hadoop 2.7 if desired, but you could have some troubles with different cloud providers' storage, read more about it here.
wget https://archive.apache.org/dist/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3.tgz
tar -xzvf spark-3.4.1-bin-hadoop3.tgz
export SPARK_HOME=$PWD/spark-3.4.1-bin-hadoop3
Inside the project folder, launch a spark shell with the required dependencies:
$SPARK_HOME/bin/spark-shell \
--packages io.qbeast:qbeast-spark_2.12:0.5.0,io.delta:delta-core_2.12:2.4.0 \
--conf spark.sql.extensions=io.qbeast.spark.internal.QbeastSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=io.qbeast.spark.internal.sources.catalog.QbeastCatalog
Read the CSV source file placed inside the project.
val csv_df = spark.read.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("./src/test/resources/ecommerce100K_2019_Oct.csv")
Indexing the dataset by writing it into the qbeast format, specifying the columns to index.
val tmp_dir = "/tmp/qbeast-spark"
csv_df.write
.mode("overwrite")
.format("qbeast")
.option("columnsToIndex", "user_id,product_id")
.save(tmp_dir)
You can create a table with Qbeast with the help of QbeastCatalog
.
spark.sql(
"CREATE TABLE student (id INT, name STRING, age INT) " +
"USING qbeast OPTIONS ('columnsToIndex'='id')")
Use INSERT INTO
to add records to the new table. It will update the index in a dynamic fashion when new data is inserted.
spark.sql("INSERT INTO table student SELECT * FROM visitor_students")
Load the newly indexed dataset.
val qbeast_df =
spark
.read
.format("qbeast")
.load(tmp_dir)
Sampling the data, notice how the sampler is converted into filters and pushed down to the source!
qbeast_df.sample(0.1).explain(true)
Go to the Quickstart or notebook for more details.
Get insights or execute operations to the data using the QbeastTable
interface!
import io.qbeast.spark.QbeastTable
val qbeast_table = QbeastTable.forPath(spark, tmp_dir)
qbeastTable.getIndexMetrics()
qbeastTable.analyze()
Go to QbeastTable documentation for more detailed information.
Use Python index visualizer for your indexed table to visually examine index structure and gather sampling metrics.
Version | Spark | Hadoop | Delta Lake |
---|---|---|---|
0.1.0 | 3.0.0 | 3.2.0 | 0.8.0 |
0.2.0 | 3.1.x | 3.2.0 | 1.0.0 |
0.3.x | 3.2.x | 3.3.x | 1.2.x |
0.4.x | 3.3.x | 3.3.x | 2.1.x |
0.5.x | 3.4.x | 3.3.x | 2.4.x |
Check here for Delta Lake and Apache Spark version compatibility.
See Contribution Guide for more information.
See LICENSE.
See Code of conduct