Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KYUUBI #6691] A new Spark SQL command to merge small files #6695

Closed
wants to merge 35 commits into from
Closed
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
532369d
1. involve a compact table command to merge small files
gabrywu Sep 11, 2024
d2cbcc2
parser tests pass
gabrywu Sep 11, 2024
1a92be0
reformat all codes
gabrywu Sep 11, 2024
906b47e
SparkPlan resolved successfully
gabrywu Sep 11, 2024
4e9d8ca
reformat
gabrywu Sep 11, 2024
350326e
adding unit test to recover command
gabrywu Sep 12, 2024
0d30887
compact table execution tests pass
gabrywu Sep 12, 2024
a52d501
remove unnecessary comments
gabrywu Sep 12, 2024
7b10692
recover compact table command tests pass
gabrywu Sep 13, 2024
0ef55ff
more unit tests
gabrywu Sep 13, 2024
f04170b
fix scala style issue
gabrywu Sep 13, 2024
02f6303
reduce message count
gabrywu Sep 13, 2024
cc0ecce
involve createToScalaConverter
gabrywu Sep 13, 2024
32276b5
remove unused import
gabrywu Sep 13, 2024
c48b167
remove unused import
gabrywu Sep 13, 2024
b4fd2ad
remove unnecessary comment & reformat
gabrywu Sep 14, 2024
79e4e94
involve SPECULATION_ENABLED_SYNONYM
gabrywu Sep 14, 2024
257dfd6
involve createRandomTable
gabrywu Sep 15, 2024
66faca4
try to catch unknown Row
gabrywu Sep 15, 2024
fd5a3c4
use Seq instead of WrappedArray
gabrywu Sep 15, 2024
3775c95
compile on scala-2.13 successfully
gabrywu Sep 15, 2024
dccc23a
remove unused import
gabrywu Sep 15, 2024
62c1044
ByteUnit.MiB.toBytes to fix default target size
gabrywu Sep 15, 2024
42d1fc0
rename compact-table.md to docs
gabrywu Sep 16, 2024
232407a
remove unused comments
gabrywu Sep 16, 2024
5a34b97
support orc
gabrywu Sep 16, 2024
c042dfa
involve toJavaList to compile on scala 2.13
gabrywu Sep 16, 2024
d35f7d4
add bzip2 unit tests
gabrywu Sep 16, 2024
8b637de
spotless:apply
gabrywu Sep 16, 2024
7fbc805
fix getCodecFromFilePath for orc
gabrywu Sep 16, 2024
59f0b99
reformat
gabrywu Sep 16, 2024
ab9b674
support more codec
gabrywu Sep 18, 2024
22f0b79
remove unused util class, close opened stream in finally block
gabrywu Sep 18, 2024
8cc2390
rollback regardless of the success or failure of the command
gabrywu Sep 18, 2024
fd39f66
reformat
gabrywu Sep 18, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions extensions/spark/kyuubi-extension-spark-3-5/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# kyuubi-extension-spark-3-3

## compact table command

it's a new spark sql command to compact small files in a table into larger files, such as 128MB. After compacting is
done, it create a temporary view to query the compacted file details.

### syntax

#### compact table

```sparksql
compact table table_name [INTO ${targetFileSize} ${targetFileSizeUnit} ] [ cleanup | retain | list ]
-- targetFileSizeUnit can be 'b','k','m','g','t','p'
-- cleanup means cleaning compact staging folders, which contains original small files, default behavior
-- retain means retaining compact staging folders, for testing, and we can recover with the staging data
-- list means this command only get the merging result, and don't run actually
```
#### recover table
```sparksql
corecover mpact table table_name
-- recover the compacted table, and restore the small files from staging to the original location
```

### example

The following command will compact the small files in the table `default.small_files_table` into 128MB files, and create
a temporary view `v_merged_files` to query the compacted file details.

```sparksql
set spark.sql.shuffle.partitions=32;

compact table default.small_files_table;

select * from v_merged_files;
```
8 changes: 8 additions & 0 deletions extensions/spark/kyuubi-extension-spark-3-5/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,14 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<!-- Change this to match your Spark version -->
gabrywu marked this conversation as resolved.
Show resolved Hide resolved
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-runtime</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ singleStatement

statement
: OPTIMIZE multipartIdentifier whereClause? zorderClause #optimizeZorder
| COMPACT TABLE multipartIdentifier
(INTO targetFileSize=INTEGER_VALUE FILE_SIZE_UNIT_LITERAL)?
(action=compactAction)? #compactTable
| RECOVER COMPACT TABLE multipartIdentifier #recoverCompactTable
| .*? #passThrough
;

Expand All @@ -62,6 +66,9 @@ zorderClause
: ZORDER BY order+=multipartIdentifier (',' order+=multipartIdentifier)*
;

compactAction
: CLEANUP | RETAIN | LIST
;
// We don't have an expression rule in our grammar here, so we just grab the tokens and defer
// parsing them to later.
predicateToken
Expand Down Expand Up @@ -101,6 +108,12 @@ nonReserved
| ZORDER
;

COMPACT: 'COMPACT';
INTO: 'INTO';
RECOVER: 'RECOVER';
CLEANUP: 'CLEANUP';
RETAIN:'RETAIN';
LIST:'LIST';
AND: 'AND';
BY: 'BY';
FALSE: 'FALSE';
Expand All @@ -115,7 +128,9 @@ WHERE: 'WHERE';
ZORDER: 'ZORDER';

MINUS: '-';

FILE_SIZE_UNIT_LITERAL:
'M' | 'MB'
;
BIGINT_LITERAL
: DIGIT+ 'L'
;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.parser.ParserUtils.withOrigin
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project, Sort}

import org.apache.kyuubi.sql.KyuubiSparkSQLParser._
import org.apache.kyuubi.sql.compact.{CompactTableOptions, CompactTableStatement, RecoverCompactTableStatement}
import org.apache.kyuubi.sql.zorder.{OptimizeZorderStatement, Zorder}

class KyuubiSparkSQLAstBuilder extends KyuubiSparkSQLBaseVisitor[AnyRef] with SQLConfHelper {
Expand Down Expand Up @@ -127,6 +128,20 @@ class KyuubiSparkSQLAstBuilder extends KyuubiSparkSQLBaseVisitor[AnyRef] with SQ
UnparsedPredicateOptimize(tableIdent, predicate, orderExpr)
}

override def visitCompactTable(ctx: CompactTableContext): CompactTableStatement =
withOrigin(ctx) {
val tableParts = visitMultipartIdentifier(ctx.multipartIdentifier())
val targetFileSize = Option(ctx.targetFileSize).map(_.getText.toLong)
val action = Option(ctx.action).map(_.getText)
CompactTableStatement(tableParts, targetFileSize, CompactTableOptions(action))
}

override def visitRecoverCompactTable(ctx: RecoverCompactTableContext)
: RecoverCompactTableStatement = withOrigin(ctx) {
val tableParts = visitMultipartIdentifier(ctx.multipartIdentifier())
RecoverCompactTableStatement(tableParts)
}

override def visitPassThrough(ctx: PassThroughContext): LogicalPlan = null

override def visitMultipartIdentifier(ctx: MultipartIdentifierContext): Seq[String] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.kyuubi.sql

import org.apache.spark.sql.SparkSessionExtensions

import org.apache.kyuubi.sql.compact.CompactTableResolver
import org.apache.kyuubi.sql.zorder.{InsertZorderBeforeWritingDatasource, InsertZorderBeforeWritingHive, ResolveZorder}

class KyuubiSparkSQLCommonExtension extends (SparkSessionExtensions => Unit) {
Expand All @@ -32,6 +33,7 @@ object KyuubiSparkSQLCommonExtension {
// inject zorder parser and related rules
extensions.injectParser { case (_, parser) => new SparkKyuubiSparkSQLParser(parser) }
extensions.injectResolutionRule(ResolveZorder)
extensions.injectResolutionRule(CompactTableResolver)

// Note that:
// InsertZorderBeforeWritingDatasource and InsertZorderBeforeWritingHive
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ package org.apache.kyuubi.sql

import org.apache.spark.sql.{FinalStageResourceManager, InjectCustomResourceProfile, SparkSessionExtensions}

import org.apache.kyuubi.sql.compact.CompactTableSparkStrategy
import org.apache.kyuubi.sql.watchdog.{ForcedMaxOutputRowsRule, KyuubiUnsupportedOperationsCheck, MaxScanStrategy}

// scalastyle:off line.size.limit

/**
* Depend on Spark SQL Extension framework, we can use this extension follow steps
* 1. move this jar into $SPARK_HOME/jars
* 2. add config into `spark-defaults.conf`: `spark.sql.extensions=org.apache.kyuubi.sql.KyuubiSparkSQLExtension`
* 2. add config into `spark-defaults.conf`: `spark.sql.extensions=org.apache.kyuubi.sql.KyuubiSparkSQLExtension`
gabrywu marked this conversation as resolved.
Show resolved Hide resolved
*/
// scalastyle:on line.size.limit
class KyuubiSparkSQLExtension extends (SparkSessionExtensions => Unit) {
Expand All @@ -40,6 +42,7 @@ class KyuubiSparkSQLExtension extends (SparkSessionExtensions => Unit) {
extensions.injectCheckRule(_ => KyuubiUnsupportedOperationsCheck)
extensions.injectOptimizerRule(ForcedMaxOutputRowsRule)
extensions.injectPlannerStrategy(MaxScanStrategy)
extensions.injectPlannerStrategy(CompactTableSparkStrategy)

extensions.injectQueryStagePrepRule(FinalStageResourceManager(_))
extensions.injectQueryStagePrepRule(InjectCustomResourceProfile)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.sql

import java.lang.reflect.Method

import org.apache.parquet.hadoop.ParquetFileWriter
import org.apache.parquet.hadoop.metadata.{FileMetaData, GlobalMetaData}

object ParquetFileWriterWrapper {
// Caused by: java.lang.IllegalAccessError: tried to access method
// org.apache.parquet.hadoop.ParquetFileWriter.mergeInto(
// Lorg/apache/parquet/hadoop/metadata/FileMetaData;
// Lorg/apache/parquet/hadoop/metadata/GlobalMetaData;Z)
// Lorg/apache/parquet/hadoop/metadata/GlobalMetaData;
// from class org.apache.parquet.hadoop.ParquetFileWriterWrapper$

val mergeInfoField: Method = classOf[ParquetFileWriter]
.getDeclaredMethod(
"mergeInto",
classOf[FileMetaData],
classOf[GlobalMetaData],
classOf[Boolean])

mergeInfoField.setAccessible(true)

def mergeInto(
toMerge: FileMetaData,
mergedMetadata: GlobalMetaData,
strict: Boolean): GlobalMetaData = {
mergeInfoField.invoke(
null,
toMerge.asInstanceOf[AnyRef],
mergedMetadata.asInstanceOf[AnyRef],
strict.asInstanceOf[AnyRef]).asInstanceOf[GlobalMetaData]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.sql.compact

import org.apache.hadoop.fs.FileSystem
import org.apache.spark.sql.{Row, SparkInternalExplorer, SparkSession}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.command.{DropTableCommand, LeafRunnableCommand}

case class CachePerformanceViewCommand(
tableIdentifier: Seq[String],
performancePlan: LogicalPlan,
originalFileLocations: Seq[String],
options: CompactTableOption) extends LeafRunnableCommand {

override def innerChildren: Seq[QueryPlan[_]] = Seq(performancePlan)

override def run(sparkSession: SparkSession): Seq[Row] = {
val dropViewCommand = DropTableCommand(
CompactTableUtils.getTableIdentifier(tableIdentifier),
ifExists = true,
isView = true,
purge = true)
dropViewCommand.run(sparkSession)

val speculation =
sparkSession.sparkContext.getConf.getBoolean("spark.speculation", defaultValue = false)
if (speculation) {
sparkSession.sparkContext.getConf.set("spark.speculation", "false")
gabrywu marked this conversation as resolved.
Show resolved Hide resolved
log.warn("set spark.speculation to false")
}

val cacheTableCommand =
SparkInternalExplorer.CacheTableAsSelectExec(tableIdentifier.head, performancePlan)

// this result always empty
cacheTableCommand.run()

if (options == CompactTableOptions.CleanupStagingFolder) {
val fileSystem = FileSystem.get(sparkSession.sparkContext.hadoopConfiguration)
originalFileLocations.foreach { originalFileLocation =>
val compactStagingDir = CompactTableUtils.getCompactStagingDir(originalFileLocation)
fileSystem.delete(compactStagingDir, true)
}

}
if (speculation) {
sparkSession.sparkContext.getConf.set("spark.speculation", "true")
log.warn("rollback spark.speculation to true")
}
Seq.empty[Row]
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.sql.compact

import org.apache.spark.sql.catalyst.analysis.UnresolvedUnaryNode
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.plans.logical.{LeafParsedStatement, LogicalPlan}
import org.apache.spark.sql.types._

object CompactTable {
private val fileLocAndSizeStructArrayType: ArrayType =
DataTypes.createArrayType(DataTypes.createStructType(Array(
DataTypes.createStructField("sub_group_id", IntegerType, false),
DataTypes.createStructField("name", StringType, false),
DataTypes.createStructField("length", LongType, false))))

val smallFileCollectOutput: StructType = DataTypes.createStructType(Array(
DataTypes.createStructField("group_id", IntegerType, false),
DataTypes.createStructField("location", StringType, false),
DataTypes.createStructField("data_source", StringType, false),
DataTypes.createStructField("codec", StringType, true),
DataTypes.createStructField("smallFiles", fileLocAndSizeStructArrayType, false)))

val smallFileCollectOutputAttribute: Seq[AttributeReference] = smallFileCollectOutput
.map(field => AttributeReference(field.name, field.dataType, field.nullable)())

val mergedFilesCachedTableName = "v_merged_files"
val mergeMetadataKey = "spark.sql.compact.parquet.metadata.merge"
}

trait CompactTableOption

object CompactTableOptions {
def apply(options: Option[String]): CompactTableOption = options.map(_.toLowerCase) match {
case Some("retain") => RetainStagingFolder
case Some("list") => DryRun
case _ => CleanupStagingFolder
}

case object CleanupStagingFolder extends CompactTableOption

case object RetainStagingFolder extends CompactTableOption

case object DryRun extends CompactTableOption
}

case class CompactTable(
child: LogicalPlan,
targetSizeInBytes: Option[Long],
options: CompactTableOption) extends UnresolvedUnaryNode {
override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = {
CompactTable(newChild, targetSizeInBytes, options)
}
}

case class CompactTableStatement(
tableParts: Seq[String],
targetSizeInMB: Option[Long],
options: CompactTableOption) extends LeafParsedStatement

case class RecoverCompactTableStatement(tableParts: Seq[String])
extends LeafParsedStatement

case class RecoverCompactTable(child: LogicalPlan) extends UnresolvedUnaryNode {
override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = {
RecoverCompactTable(newChild)
}
}
Loading
Loading