Skip to content

Commit

Permalink
[Spark] Add Clustering example
Browse files Browse the repository at this point in the history
Add Clustering example

Closes #2520

GitOrigin-RevId: 895b82e456dd1cd26c7e6b13d84f4ad4428b9913
  • Loading branch information
zedtang authored and vkorukanti committed Jan 17, 2024
1 parent 0fbc0e1 commit 007195d
Showing 1 changed file with 61 additions and 0 deletions.
61 changes: 61 additions & 0 deletions examples/scala/src/main/scala/example/Clustering.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package example

import org.apache.spark.sql.SparkSession

object Clustering {

def main(args: Array[String]): Unit = {
val tableName = "deltatable"

val deltaSpark = SparkSession
.builder()
.appName("Clustering-Delta")
.master("local[*]")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.getOrCreate()

// Clear up old session
deltaSpark.sql(s"DROP TABLE IF EXISTS $tableName")

// Enable preview config for clustering
deltaSpark.conf.set(
"spark.databricks.delta.clusteredTable.enableClusteringTablePreview", "true")

try {
// Create a table
println("Creating a table")
deltaSpark.sql(
s"""CREATE TABLE $tableName (col1 INT, col2 STRING) using DELTA
|CLUSTER BY (col1, col2)""".stripMargin)

// Insert new data
println("Insert new data")
deltaSpark.sql(s"INSERT INTO $tableName VALUES (123, '123')")

// Optimize the table
println("Optimize the table")
deltaSpark.sql(s"OPTIMIZE $tableName")
} finally {
// Cleanup
deltaSpark.sql(s"DROP TABLE IF EXISTS $tableName")
deltaSpark.stop()
}
}
}

0 comments on commit 007195d

Please sign in to comment.