Skip to content

Commit 813d183

Browse files
fenzhuGitHub Enterprise
fenzhu
authored and
GitHub Enterprise
committed
[CARMEL-6345] Support backup table command (#1124)
* [CARMEL-6345] Support backup table command * repair * grammer * comment
1 parent 9a4f8c3 commit 813d183

File tree

5 files changed

+235
-0
lines changed

5 files changed

+235
-0
lines changed

pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1422,6 +1422,12 @@
14221422
</exclusion>
14231423
</exclusions>
14241424
</dependency>
1425+
<dependency>
1426+
<groupId>org.apache.hadoop</groupId>
1427+
<artifactId>hadoop-distcp</artifactId>
1428+
<version>${hadoop.version}</version>
1429+
<scope>${hadoop.deps.scope}</scope>
1430+
</dependency>
14251431
<dependency>
14261432
<groupId>org.apache.zookeeper</groupId>
14271433
<artifactId>zookeeper</artifactId>

sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,7 @@ statement
266266
| unsupportedHiveNativeCommands .*? #failNativeCommand
267267
| COMPACT TABLE target=tableIdentifier partitionSpec?
268268
(INTO fileNum=INTEGER_VALUE identifier)? #compactTable
269+
| LOAD TABLE source=tableIdentifier TO target=tableIdentifier #backupTable
269270
| SETTABLE tableIdentifier (READY | DONE) FOR
270271
( DAY? '(' (date=STRING | dateExpr=primaryExpression) ')' |
271272
(DAY_RANGE? | WEEK_RANGE? | MONTH_RANGE?) '('

sql/core/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,10 @@
173173
<artifactId>jackson-mapper-asl</artifactId>
174174
<scope>test</scope>
175175
</dependency>
176+
<dependency>
177+
<groupId>org.apache.hadoop</groupId>
178+
<artifactId>hadoop-distcp</artifactId>
179+
</dependency>
176180
</dependencies>
177181
<build>
178182
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1032,4 +1032,11 @@ class SparkSqlAstBuilder extends AstBuilder {
10321032
val toInfo = if (ctx.to == null) null else string(ctx.to)
10331033
AnalyzeTableLineageCommand(persist, fileInfo, dir, prefixInfo, fromInfo, toInfo)
10341034
}
1035+
1036+
override def visitBackupTable(ctx: BackupTableContext): LogicalPlan = withOrigin(ctx) {
1037+
val sourceTableName = visitTableIdentifier(ctx.source)
1038+
val targetTableName = visitTableIdentifier(ctx.target)
1039+
BackupTableCommand(sourceTableName, targetTableName)
1040+
}
1041+
10351042
}
Lines changed: 217 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,217 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.command
19+
20+
import scala.collection.mutable.ArrayBuffer
21+
22+
import org.apache.hadoop.conf.Configuration
23+
import org.apache.hadoop.fs.{FileSystem, Path}
24+
import org.apache.hadoop.tools.{DistCp, DistCpOptions}
25+
26+
import org.apache.spark.SparkException
27+
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
28+
import org.apache.spark.sql.catalyst.TableIdentifier
29+
import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, CatalogUtils}
30+
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
31+
import org.apache.spark.sql.types.StringType
32+
33+
/**
34+
* Support to backup source table to target table
35+
* The syntax of using this command in SQL is:
36+
* {{{
37+
* LOAD TABLE db1.tb1 TO db2.tb2
38+
* }}}
39+
*/
40+
case class BackupTableCommand(sourceTableName: TableIdentifier,
41+
targetTableName: TableIdentifier) extends RunnableCommand {
42+
43+
override val output: Seq[Attribute] = {
44+
AttributeReference("Backup Table Operations", StringType, nullable = false)() ::
45+
AttributeReference("Result", StringType, nullable = false)() :: Nil
46+
}
47+
48+
override def run(sparkSession: SparkSession): Seq[Row] = {
49+
val currentQueue = sparkSession.sparkContext.conf.get("spark.yarn.queue", "")
50+
if (!currentQueue.contains("reserved")
51+
&& !currentQueue.contains("test")
52+
&& !currentQueue.contains("staging")) {
53+
throw new SparkException(s"BACKUP TABLE command should only be executed in " +
54+
s"reserved, reserved-test and staging queues, current queue is $currentQueue.")
55+
}
56+
val result = new ArrayBuffer[Row]
57+
val catalog = sparkSession.sessionState.catalog
58+
if (catalog.tableExists(targetTableName)) {
59+
throw new AnalysisException(s"Target table already exists!")
60+
}
61+
val sourceTable = catalog.getTableMetadata(sourceTableName)
62+
val sourceTablePath = new Path(sourceTable.location)
63+
val sourceTableIdentWithDB = sourceTableName.quotedString
64+
if (sourceTable.tableType == CatalogTableType.VIEW) {
65+
throw new AnalysisException(s"BACKUP TABLE command is not " +
66+
s"allowed on a view: $sourceTableIdentWithDB")
67+
}
68+
if (CatalogUtils.isTemporaryTable(sourceTable)) {
69+
throw new AnalysisException(s"BACKUP TABLE command is not " +
70+
s"allowed on a temporary table: $sourceTableIdentWithDB")
71+
}
72+
73+
val conf = sparkSession.sessionState.newHadoopConf()
74+
75+
val targetTableLoc = catalog.defaultTablePath(targetTableName)
76+
val targetTablePath = new Path(targetTableLoc)
77+
val fs = targetTablePath.getFileSystem(conf)
78+
val qualifiedTargetPath = fs.makeQualified(targetTablePath)
79+
if (fs.exists(qualifiedTargetPath)) {
80+
throw new AnalysisException(
81+
s"Default target table path already exists: ${targetTablePath.toString}")
82+
}
83+
84+
val cpData = runDistCp(fs, currentQueue,
85+
java.util.Arrays.asList(sourceTablePath), qualifiedTargetPath, conf)
86+
val copied = cpData._1
87+
val step1 = s"Step 1: DistCP $sourceTablePath to $qualifiedTargetPath"
88+
val msg = cpData._2
89+
result.append(Row(step1, msg))
90+
if (!copied) {
91+
return result
92+
}
93+
94+
val ddlProp = Map(DDLUtils.DDL_TIME -> (System.currentTimeMillis() / 1000).toString,
95+
"backup_table_source" -> sourceTableName.unquotedString)
96+
val newTableDesc = sourceTable
97+
.copy(identifier = targetTableName,
98+
createTime = System.currentTimeMillis,
99+
properties = sourceTable.properties ++ ddlProp)
100+
.withNewStorage(locationUri = Some(targetTableLoc))
101+
// Table location is already validated. No need to check it again during table creation.
102+
catalog.createTable(newTableDesc, ignoreIfExists = false, validateLocation = false)
103+
104+
val step2 = s"Step 2: Create target table ${targetTableName.unquotedString}"
105+
if (catalog.tableExists(targetTableName)) {
106+
result.append(Row(step2, "Success"))
107+
} else {
108+
result.append(Row(step2, "Failed"))
109+
deleteDirectory(fs, qualifiedTargetPath)
110+
return result
111+
}
112+
113+
if (sourceTable.partitionColumnNames.nonEmpty) {
114+
val targetTable = catalog.getTableMetadata(targetTableName)
115+
try {
116+
// Need to recover partitions into the metastore so our saved data is visible.
117+
sparkSession.sessionState.executePlan(
118+
AlterTableRecoverPartitionsCommand(targetTable.identifier)).toRdd
119+
result.append(Row("Step 2 (repair table partitions)", "Success"))
120+
} catch {
121+
case e: Exception =>
122+
result.append(Row("Step 2 (repair table partitions)", s"Failed (${e.toString})"))
123+
catalog.dropTable(targetTableName, ignoreIfNotExists = true, purge = true)
124+
return result
125+
}
126+
}
127+
128+
val step3 = s"Step 3: Validate row count"
129+
val skipValidation = sparkSession.sessionState.conf.getConfString(
130+
"spark.sql.backup.table.validation.skip", "false").toBoolean
131+
if (skipValidation) {
132+
result.append(Row(step3, s"Skip backup table row count validation"))
133+
} else {
134+
val sc = sparkSession.table(sourceTableName).count()
135+
val tc = sparkSession.table(targetTableName).count()
136+
if (sc == tc) {
137+
result.append(Row(step3, s"Success ($sc)"))
138+
} else {
139+
result.append(Row(step3, s"Failed ($sc vs $tc)"))
140+
catalog.dropTable(targetTableName, ignoreIfNotExists = true, purge = true)
141+
}
142+
}
143+
144+
result.append(Row("All Backup Table Operations", "Success"))
145+
result
146+
}
147+
148+
private def runDistCp(fs: FileSystem, currentQueue: String, srcPaths: java.util.List[Path],
149+
dst: Path, conf: Configuration): (Boolean, String) = {
150+
val options = new DistCpOptions(srcPaths, dst)
151+
logInfo(s"DistCp options: $options")
152+
val params = constructDistCpParams(srcPaths, dst, conf)
153+
logInfo(s"DistCp parameters: $params")
154+
try {
155+
conf.setBoolean("mapred.mapper.new-api", true)
156+
conf.set("mapreduce.job.name", s"backup data from " +
157+
s"${sourceTableName.unquotedString} to ${targetTableName.unquotedString}")
158+
conf.set("mapreduce.job.queuename", currentQueue)
159+
conf.set("mapreduce.map.memory.mb", "4096")
160+
conf.set("mapreduce.map.java.opts", "-Xmx3072m")
161+
val distcp = new DistCp(conf, options)
162+
val res = distcp.run(params.toArray(new Array[String](params.size))) == 0
163+
(res, "Success")
164+
} catch {
165+
case e: Exception =>
166+
logError(s"Cannot execute DistCp process: ${e.toString}")
167+
deleteDirectory(fs, dst)
168+
(false, s"Fail to DistCp table data: ${e.toString}")
169+
}
170+
}
171+
172+
private def constructDistCpParams(srcPaths: java.util.List[Path],
173+
dst: Path, conf: Configuration): java.util.ArrayList[String] = {
174+
val params = new java.util.ArrayList[String]()
175+
val DISTCP_OPTIONS_PREFIX = "distcp.options."
176+
val iter = conf.iterator()
177+
var specifyMaxTask = false
178+
while (iter.hasNext) {
179+
val tuple = iter.next()
180+
val key = tuple.getKey
181+
if (key.startsWith(DISTCP_OPTIONS_PREFIX)) {
182+
val skey = key.substring(DISTCP_OPTIONS_PREFIX.length)
183+
if (skey.equals("m")) {
184+
specifyMaxTask = true
185+
}
186+
params.add("-" + skey)
187+
val value = tuple.getValue
188+
if (value != null && value.nonEmpty) {
189+
params.add(value)
190+
}
191+
}
192+
}
193+
if (!specifyMaxTask) {
194+
params.add("-m")
195+
params.add("1000")
196+
}
197+
srcPaths.forEach(p => params.add(p.toString))
198+
params.add(dst.toString)
199+
params
200+
}
201+
202+
private def deleteDirectory(fs: FileSystem, dir: Path): Unit = {
203+
if (fs.exists(dir)) {
204+
logInfo(s"Start to clean target path: $dir")
205+
try {
206+
if (fs.delete(dir, true)) {
207+
logInfo(s"Finish clean target path: $dir")
208+
} else {
209+
logWarning(s"Fail to clean target path: $dir, please delete it manually.")
210+
}
211+
} catch {
212+
case ee: Exception =>
213+
logError(s"Delete target path exception: $ee")
214+
}
215+
}
216+
}
217+
}

0 commit comments

Comments
 (0)