Skip to content

Commit

Permalink
add index refresh
Browse files Browse the repository at this point in the history
Signed-off-by: Peng Huo <penghuo@gmail.com>
  • Loading branch information
penghuo committed Mar 6, 2023
1 parent f9018da commit aec823a
Showing 1 changed file with 20 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -463,13 +463,29 @@ case class CreateDeltaTableCommand(
convertProperties: ConvertTarget
): Unit = {
def myFunc(batchDF: DataFrame, batchID: Long): Unit = {
deltaLog.withNewTransaction { txn => {
val paths = batchDF.collect().map(row => new Path(row(0).toString))
logWarning(s"refresh with files: $paths")
performConvert(spark, txn, convertProperties, Option(paths))
deltaLog.withNewTransaction { txn => {
logWarning(s"=== Refresh with files ===")
val paths = batchDF.collect().map(row => {
val path = row(0).toString
logWarning(s"New file: $path")
new Path(path)
})
performConvert(spark, txn, convertProperties, Option(paths))
}
}

/**
* TODO: refresh all index/MV in current Delta table metadata
*/
logWarning("=== Refreshing index ===")
val tableName = table.qualifiedName
val indexes = spark.sql(s"SHOW INDEXES ON $tableName")
for (index <- indexes.select("Name").collect.map(_(0))) {
logWarning(s"Index: $index")
spark.sql(s"REFRESH INDEX $index ON $tableName")
}
}

val streamDF = spark.readStream
.schema(table.schema)
.format("parquet")
Expand Down

0 comments on commit aec823a

Please sign in to comment.