From 2a4f09b7ab125581c7caa2bf57513abc07ac3c7f Mon Sep 17 00:00:00 2001 From: kunal642 Date: Wed, 15 Mar 2017 14:38:31 +0530 Subject: [PATCH] added support to revert changes if query fails --- .../ThriftWrapperSchemaConverterImpl.java | 4 +- .../carbondata/core/util/DataTypeUtil.java | 16 +- .../execution/command/carbonTableSchema.scala | 2 +- .../command/AlterTableCommands.scala | 48 +++--- .../spark/sql/hive/CarbonMetastore.scala | 30 +++- .../apache/spark/util/AlterTableUtil.scala | 163 +++++++++++++++++- .../AlterTableRevertTestCase.scala | 69 ++++++++ .../spark/sql/common/util/QueryTest.scala | 3 + 8 files changed, 303 insertions(+), 32 deletions(-) create mode 100644 integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java index 09ed3685a18..974cc81e07a 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java @@ -404,8 +404,10 @@ private BucketingInfo fromExternalToWarpperBucketingInfo( org.apache.carbondata.format.TableInfo externalTableInfo, String dbName, String tableName, String storePath) { TableInfo wrapperTableInfo = new TableInfo(); + List schemaEvolutionList = + externalTableInfo.getFact_table().getSchema_evolution().getSchema_evolution_history(); wrapperTableInfo.setLastUpdatedTime( - externalTableInfo.getFact_table().getSchema_evolution().getSchema_evolution_history().get(0) + schemaEvolutionList.get(schemaEvolutionList.size() - 1) .getTime_stamp()); wrapperTableInfo.setDatabaseName(dbName); wrapperTableInfo.setTableUniqueName(dbName + "_" + tableName); diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java index e437405d760..76df425e3c3 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java @@ -573,10 +573,10 @@ public static int compareDoubleWithNan(Double d1, Double d2) { * Below method will be used to convert the data into byte[] * * @param data - * @param actualDataType actual data type + * @param ColumnSchema * @return actual data in byte[] */ - public static byte[] convertDataToBytesBasedOnDataType(String data, DataType actualDataType) { + public static byte[] convertDataToBytesBasedOnDataType(String data, ColumnSchema columnSchema) { if (null == data) { return null; } else if (CarbonCommonConstants.MEMBER_DEFAULT_VAL.equals(data)) { @@ -585,7 +585,7 @@ public static byte[] convertDataToBytesBasedOnDataType(String data, DataType act } try { long parsedIntVal = 0; - switch (actualDataType) { + switch (columnSchema.getDataType()) { case INT: parsedIntVal = (long) Integer.parseInt(data); return String.valueOf(parsedIntVal) @@ -602,13 +602,17 @@ public static byte[] convertDataToBytesBasedOnDataType(String data, DataType act .getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)); case DATE: case TIMESTAMP: - DirectDictionaryGenerator directDictionaryGenerator = - DirectDictionaryKeyGeneratorFactory.getDirectDictionaryGenerator(actualDataType); + DirectDictionaryGenerator directDictionaryGenerator = DirectDictionaryKeyGeneratorFactory + .getDirectDictionaryGenerator(columnSchema.getDataType()); int value = directDictionaryGenerator.generateDirectSurrogateKey(data); return String.valueOf(value) .getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)); case DECIMAL: - java.math.BigDecimal javaDecVal = new java.math.BigDecimal(data); + String parsedValue = parseStringToBigDecimal(data, columnSchema); + if (null == parsedValue) { + return null; + } + java.math.BigDecimal javaDecVal = new java.math.BigDecimal(parsedValue); return bigDecimalToByte(javaDecVal); default: return UTF8String.fromString(data).getBytes(); diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala index 367bf46bea4..117b3650bcd 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala @@ -239,7 +239,7 @@ class AlterTableProcessor( if (elem._1.toLowerCase.startsWith(defaultValueString)) { if (col.getColumnName.equalsIgnoreCase(elem._1.substring(defaultValueString.length))) { rawData = elem._2 - val data = DataTypeUtil.convertDataToBytesBasedOnDataType(elem._2, col.getDataType) + val data = DataTypeUtil.convertDataToBytesBasedOnDataType(elem._2, col) if (null != data) { col.setDefaultValue(data) } else { diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala index 38fdb1163b2..0be0bdf0487 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala @@ -34,6 +34,7 @@ import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverte import org.apache.carbondata.core.metadata.encoder.Encoding import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} +import org.apache.carbondata.core.util.CarbonUtil import org.apache.carbondata.core.util.path.CarbonStorePath import org.apache.carbondata.format.{ColumnSchema, SchemaEvolutionEntry, TableInfo} import org.apache.carbondata.spark.exception.MalformedCarbonCommandException @@ -56,6 +57,7 @@ private[sql] case class AlterTableAddColumns( // get the latest carbon table and check for column existence val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName) var newCols = Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]() + val lastUpdatedTime = carbonTable.getTableLastUpdatedTime try { // read the latest schema file val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath, @@ -90,9 +92,8 @@ private[sql] case class AlterTableAddColumns( LOGGER.info(s"Alter table for add columns is successful for table $dbName.$tableName") LOGGER.audit(s"Alter table for add columns is successful for table $dbName.$tableName") } catch { - case e: Exception => - LOGGER.error(e, s"Alter table add columns failed : ${e.getMessage}") - // clean up the dictionary files in case of any failure + case e: Exception => LOGGER + .error("Alter table add columns failed :" + e.getMessage) if (!newCols.isEmpty) { LOGGER.info("Cleaning up the dictionary files as alter table add operation failed") new AlterTableDropColumnRDD(sparkSession.sparkContext, @@ -100,6 +101,7 @@ private[sql] case class AlterTableAddColumns( carbonTable.getCarbonTableIdentifier, carbonTable.getStorePath).collect() } + AlterTableUtil.revertAddColumnChanges(dbName, tableName, lastUpdatedTime)(sparkSession) sys.error("Alter table add column operation failed. Please check the logs") } finally { // release lock after command execution completion @@ -151,6 +153,7 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableR .validateTableAndAcquireLock(oldDatabaseName, oldTableName, locksToBeAcquired, LOGGER)( sparkSession) val carbonTable = relation.tableMeta.carbonTable + val lastUpdatedTime = carbonTable.getTableLastUpdatedTime try { // get the latest carbon table and check for column existence val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath, @@ -160,6 +163,7 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableR .readSchemaFile(tableMetadataFile) val schemaEvolutionEntry = new SchemaEvolutionEntry(System.currentTimeMillis) schemaEvolutionEntry.setTableName(newTableName) + schemaEvolutionEntry.setTime_stamp(System.currentTimeMillis()) renameBadRecords(oldTableName, newTableName, oldDatabaseName) val fileType = FileFactory.getFileType(tableMetadataFile) if (FileFactory.isFileExist(tableMetadataFile, fileType)) { @@ -167,8 +171,8 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableR .renameForce(carbonTablePath.getParent.toString + CarbonCommonConstants.FILE_SEPARATOR + newTableName) if (!rename) { - sys.error(s"Folder rename failed for table $oldDatabaseName.$oldTableName") renameBadRecords(newTableName, oldTableName, oldDatabaseName) + sys.error(s"Folder rename failed for table $oldDatabaseName.$oldTableName") } } val newTableIdentifier = new CarbonTableIdentifier(oldDatabaseName, @@ -190,8 +194,11 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableR LOGGER.audit(s"Table $oldTableName has been successfully renamed to $newTableName") LOGGER.info(s"Table $oldTableName has been successfully renamed to $newTableName") } catch { - case e: Exception => - LOGGER.error(e, s"Rename table failed: ${e.getMessage}") + case e: Exception => LOGGER + .error("Rename table failed: " + e.getMessage) + AlterTableUtil.revertRenameTableChanges(oldTableIdentifier, newTableName, lastUpdatedTime)( + sparkSession) + renameBadRecords(newTableName, oldTableName, oldDatabaseName) sys.error("Alter table rename table operation failed. Please check the logs") } finally { // release lock after command execution completion @@ -240,9 +247,10 @@ private[sql] case class AlterTableDropColumns( val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK) val locks = AlterTableUtil .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired, LOGGER)(sparkSession) + // get the latest carbon table and check for column existence + val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName) + val lastUpdatedTime = carbonTable.getTableLastUpdatedTime try { - // get the latest carbon table and check for column existence - val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName) // check each column existence in the table val tableColumns = carbonTable.getCreateOrderColumn(tableName).asScala var dictionaryColumns = Seq[org.apache.carbondata.core.metadata.schema.table.column @@ -269,12 +277,8 @@ private[sql] case class AlterTableDropColumns( } // take the total key column count. key column to be deleted should not // be >= key columns in schema - var totalKeyColumnInSchema = 0 - tableColumns.foreach { tableColumn => - // column should not be already deleted and should exist in the table - if (!tableColumn.isInvisible && tableColumn.isDimesion) { - totalKeyColumnInSchema += 1 - } + val totalKeyColumnInSchema = tableColumns.count { + tableColumn => !tableColumn.isInvisible && tableColumn.isDimesion } if (keyColumnCountToBeDeleted >= totalKeyColumnInSchema) { sys.error(s"Alter drop operation failed. AtLeast one key column should exist after drop.") @@ -315,8 +319,9 @@ private[sql] case class AlterTableDropColumns( LOGGER.info(s"Alter table for drop columns is successful for table $dbName.$tableName") LOGGER.audit(s"Alter table for drop columns is successful for table $dbName.$tableName") } catch { - case e: Exception => - LOGGER.error(e, s"Alter table drop columns failed : ${e.getMessage}") + case e: Exception => LOGGER + .error("Alter table drop columns failed : " + e.getMessage) + AlterTableUtil.revertDropColumnChanges(dbName, tableName, lastUpdatedTime)(sparkSession) sys.error("Alter table drop column operation failed. Please check the logs") } finally { // release lock after command execution completion @@ -339,11 +344,11 @@ private[sql] case class AlterTableDataTypeChange( val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK) val locks = AlterTableUtil .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired, LOGGER)(sparkSession) + // get the latest carbon table and check for column existence + val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName) + val lastUpdatedTime = carbonTable.getTableLastUpdatedTime try { - // get the latest carbon table and check for column existence - val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName) val columnName = alterTableDataTypeChangeModel.columnName - var carbonColumnToBeModified: CarbonColumn = null val carbonColumns = carbonTable.getCreateOrderColumn(tableName).asScala.filter(!_.isInvisible) if (!carbonColumns.exists(_.getColName.equalsIgnoreCase(columnName))) { @@ -393,8 +398,9 @@ private[sql] case class AlterTableDataTypeChange( LOGGER.info(s"Alter table for data type change is successful for table $dbName.$tableName") LOGGER.audit(s"Alter table for data type change is successful for table $dbName.$tableName") } catch { - case e: Exception => - LOGGER.error(e, s"Alter table change datatype failed : ${e.getMessage}") + case e: Exception => LOGGER + .error("Alter table change datatype failed : " + e.getMessage) + AlterTableUtil.revertDataTypeChanges(dbName, tableName, lastUpdatedTime)(sparkSession) sys.error("Alter table data type change operation failed. Please check the logs") } finally { // release lock after command execution completion diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala index 64604902b2e..6f749604198 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala @@ -301,7 +301,7 @@ class CarbonMetastore(conf: RuntimeConfig, val storePath: String) { } /** - * This method will overwrite the existing schema and update it with the gievn details + * This method will overwrite the existing schema and update it with the given details * * @param carbonTableIdentifier * @param thriftTableInfo @@ -327,6 +327,34 @@ class CarbonMetastore(conf: RuntimeConfig, val storePath: String) { carbonTableIdentifier.getTableName)(sparkSession) } + /** + * This method will is used to remove the evolution entry in case of failure. + * + * @param carbonTableIdentifier + * @param thriftTableInfo + * @param carbonStorePath + * @param sparkSession + */ + def revertTableSchema(carbonTableIdentifier: CarbonTableIdentifier, + thriftTableInfo: org.apache.carbondata.format.TableInfo, + carbonStorePath: String) + (sparkSession: SparkSession): String = { + val schemaConverter = new ThriftWrapperSchemaConverterImpl + val wrapperTableInfo = schemaConverter + .fromExternalToWrapperTableInfo(thriftTableInfo, + carbonTableIdentifier.getDatabaseName, + carbonTableIdentifier.getTableName, + carbonStorePath) + val evolutionEntries = thriftTableInfo.fact_table.schema_evolution.schema_evolution_history + evolutionEntries.remove(evolutionEntries.size() - 1) + createSchemaThriftFile(wrapperTableInfo, + thriftTableInfo, + carbonTableIdentifier.getDatabaseName, + carbonTableIdentifier.getTableName)(sparkSession) + } + + + /** * * Prepare Thrift Schema from wrapper TableInfo and write to Schema file. diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala index 2e7eebf777f..5057d7597e1 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala @@ -17,6 +17,7 @@ package org.apache.spark.util +import scala.collection.JavaConverters._ import scala.collection.mutable.ListBuffer import org.apache.spark.SparkConf @@ -25,14 +26,28 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.hive.{CarbonRelation, HiveExternalCatalog} import org.apache.spark.sql.hive.HiveExternalCatalog._ -import org.apache.carbondata.common.logging.LogService +import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage} +import org.apache.carbondata.core.metadata.{CarbonMetadata, CarbonTableIdentifier} import org.apache.carbondata.core.metadata.schema.table.CarbonTable -import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.util.path.CarbonStorePath import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo} object AlterTableUtil { + + val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + /** + * Validates that the table exists and acquires meta lock on it. + * + * @param dbName + * @param tableName + * @param LOGGER + * @param sparkSession + * @return + */ def validateTableAndAcquireLock(dbName: String, tableName: String, locksToBeAcquired: List[String], @@ -125,6 +140,13 @@ object AlterTableUtil { } } + /** + * @param carbonTable + * @param schemaEvolutionEntry + * @param thriftTable + * @param sparkSession + * @param catalog + */ def updateSchemaInfo(carbonTable: CarbonTable, schemaEvolutionEntry: SchemaEvolutionEntry, thriftTable: TableInfo)(sparkSession: SparkSession, catalog: HiveExternalCatalog): Unit = { @@ -167,4 +189,141 @@ object AlterTableUtil { } schemaParts.mkString(",") } + + /** + * This method reverts the changes to the schema if the rename table command fails. + * + * @param oldTableIdentifier + * @param newTableName + * @param lastUpdatedTime + * @param sparkSession + */ + def revertRenameTableChanges(oldTableIdentifier: TableIdentifier, + newTableName: String, + lastUpdatedTime: Long) + (sparkSession: SparkSession): Unit = { + val database = oldTableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase) + val carbonTable: CarbonTable = CarbonMetadata.getInstance + .getCarbonTable(database + "_" + newTableName) + val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath, + carbonTable.getCarbonTableIdentifier) + val tableMetadataFile = carbonTablePath.getSchemaFilePath + val fileType = FileFactory.getFileType(tableMetadataFile) + val tableInfo: org.apache.carbondata.format.TableInfo = CarbonEnv.get.carbonMetastore + .readSchemaFile(tableMetadataFile) + val evolutionEntryList = tableInfo.fact_table.schema_evolution.schema_evolution_history + val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp + if (updatedTime > lastUpdatedTime) { + LOGGER.error(s"Reverting changes for $database.${oldTableIdentifier.table}") + FileFactory.getCarbonFile(carbonTablePath.getPath, fileType) + .renameForce(carbonTablePath.getParent.toString + CarbonCommonConstants.FILE_SEPARATOR + + oldTableIdentifier.table) + val tableIdentifier = new CarbonTableIdentifier(database, + oldTableIdentifier.table, + carbonTable.getCarbonTableIdentifier.getTableId) + CarbonEnv.get.carbonMetastore.revertTableSchema(tableIdentifier, + tableInfo, + carbonTable.getStorePath)(sparkSession) + CarbonEnv.get.carbonMetastore.removeTableFromMetadata(database, newTableName) + } + } + + /** + * This method reverts the changes to the schema if add column command fails. + * + * @param dbName + * @param tableName + * @param lastUpdatedTime + * @param sparkSession + */ + def revertAddColumnChanges(dbName: String, tableName: String, lastUpdatedTime: Long) + (sparkSession: SparkSession): Unit = { + val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName) + + + val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath, + carbonTable.getCarbonTableIdentifier) + val tableMetadataFile = carbonTablePath.getSchemaFilePath + val thriftTable: TableInfo = CarbonEnv.get.carbonMetastore + .readSchemaFile(tableMetadataFile) + val evolutionEntryList = thriftTable.fact_table.schema_evolution.schema_evolution_history + val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp + if (updatedTime > lastUpdatedTime) { + LOGGER.error(s"Reverting changes for $dbName.$tableName") + val addedSchemas = evolutionEntryList.get(evolutionEntryList.size() - 1).added + thriftTable.fact_table.table_columns.removeAll(addedSchemas) + CarbonEnv.get.carbonMetastore.revertTableSchema(carbonTable.getCarbonTableIdentifier, + thriftTable, carbonTable.getStorePath)(sparkSession) + } + } + + /** + * This method reverts the schema changes if drop table command fails. + * + * @param dbName + * @param tableName + * @param lastUpdatedTime + * @param sparkSession + */ + def revertDropColumnChanges(dbName: String, tableName: String, lastUpdatedTime: Long) + (sparkSession: SparkSession): Unit = { + val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName) + val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath, + carbonTable.getCarbonTableIdentifier) + val tableMetadataFile = carbonTablePath.getSchemaFilePath + val thriftTable: TableInfo = CarbonEnv.get.carbonMetastore + .readSchemaFile(tableMetadataFile) + val evolutionEntryList = thriftTable.fact_table.schema_evolution.schema_evolution_history + val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp + if (updatedTime > lastUpdatedTime) { + LOGGER.error(s"Reverting changes for $dbName.$tableName") + val removedSchemas = evolutionEntryList.get(evolutionEntryList.size() - 1).removed + thriftTable.fact_table.table_columns.asScala.foreach { columnSchema => + removedSchemas.asScala.foreach { removedSchemas => + if (columnSchema.invisible && removedSchemas.column_id == columnSchema.column_id) { + columnSchema.setInvisible(false) + } + } + } + CarbonEnv.get.carbonMetastore.revertTableSchema(carbonTable.getCarbonTableIdentifier, + thriftTable, carbonTable.getStorePath)(sparkSession) + } + } + + /** + * This method reverts the changes to schema if the data type change command fails. + * + * @param dbName + * @param tableName + * @param lastUpdatedTime + * @param sparkSession + */ + def revertDataTypeChanges(dbName: String, tableName: String, lastUpdatedTime: Long) + (sparkSession: SparkSession): Unit = { + val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName) + val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath, + carbonTable.getCarbonTableIdentifier) + val tableMetadataFile = carbonTablePath.getSchemaFilePath + val thriftTable: TableInfo = CarbonEnv.get.carbonMetastore + .readSchemaFile(tableMetadataFile) + val evolutionEntryList = thriftTable.fact_table.schema_evolution.schema_evolution_history + val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp + if (updatedTime > lastUpdatedTime) { + LOGGER.error(s"Reverting changes for $dbName.$tableName") + val removedColumns = evolutionEntryList.get(evolutionEntryList.size() - 1).removed + thriftTable.fact_table.table_columns.asScala.foreach { columnSchema => + removedColumns.asScala.foreach { removedColumn => + if (columnSchema.column_id.equalsIgnoreCase(removedColumn.column_id) && + !columnSchema.isInvisible) { + columnSchema.setData_type(removedColumn.data_type) + columnSchema.setPrecision(removedColumn.precision) + columnSchema.setScale(removedColumn.scale) + } + } + } + CarbonEnv.get.carbonMetastore.revertTableSchema(carbonTable.getCarbonTableIdentifier, + thriftTable, carbonTable.getStorePath)(sparkSession) + } + } + } diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala new file mode 100644 index 00000000000..958b426227c --- /dev/null +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala @@ -0,0 +1,69 @@ +package org.apache.spark.carbondata.restructure + +import java.io.File + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.common.util.QueryTest +import org.apache.spark.sql.test.TestQueryExecutor +import org.scalatest.BeforeAndAfterAll + + +class AlterTableRevertTestCase extends QueryTest with BeforeAndAfterAll { + + override def beforeAll() { + sql("drop table if exists reverttest") + sql( + "CREATE TABLE reverttest(intField int,stringField string,timestampField timestamp," + + "decimalField decimal(6,2)) STORED BY 'carbondata'") + sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/restructure/data4.csv' INTO TABLE reverttest " + + s"options('FILEHEADER'='intField,stringField,timestampField,decimalField')") + } + + test("test to revert new added columns on failure") { + intercept[RuntimeException] { + hiveClient.runSqlHive("set hive.security.authorization.enabled=true") + sql( + "Alter table reverttest add columns(newField string) TBLPROPERTIES" + + "('DICTIONARY_EXCLUDE'='newField','DEFAULT.VALUE.charfield'='def')") + hiveClient.runSqlHive("set hive.security.authorization.enabled=false") + intercept[AnalysisException] { + sql("select newField from reverttest") + } + } + } + + test("test to revert table name on failure") { + intercept[RuntimeException] { + new File(TestQueryExecutor.warehouse + "/reverttest_fail").mkdir() + sql("alter table reverttest rename to reverttest_fail") + new File(TestQueryExecutor.warehouse + "/reverttest_fail").delete() + } + val result = sql("select * from reverttest").count() + assert(result.equals(1L)) + } + + test("test to revert drop columns on failure") { + intercept[Exception] { + hiveClient.runSqlHive("set hive.security.authorization.enabled=true") + sql("Alter table reverttest drop columns(decimalField)") + hiveClient.runSqlHive("set hive.security.authorization.enabled=false") + } + assert(sql("select decimalField from reverttest").count().equals(1L)) + } + + test("test to revert changed datatype on failure") { + intercept[Exception] { + hiveClient.runSqlHive("set hive.security.authorization.enabled=true") + sql("Alter table reverttest change intField intfield bigint") + hiveClient.runSqlHive("set hive.security.authorization.enabled=false") + } + assert( + sql("select intfield from reverttest").schema.fields.apply(0).dataType.simpleString == "int") + } + + override def afterAll() { + hiveClient.runSqlHive("set hive.security.authorization.enabled=false") + sql("drop table if exists reverttest") + } + +} diff --git a/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala b/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala index 93d128291a9..c37ea1e7241 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala @@ -23,6 +23,7 @@ import scala.collection.JavaConversions._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.hive.HiveExternalCatalog import org.apache.spark.sql.test.TestQueryExecutor import org.apache.spark.sql.{DataFrame, Row} @@ -39,6 +40,8 @@ class QueryTest extends PlanTest { val sqlContext = TestQueryExecutor.INSTANCE.sqlContext + val hiveClient = sqlContext.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client + val resourcesPath = TestQueryExecutor.resourcesPath def sql(sqlText: String): DataFrame = TestQueryExecutor.INSTANCE.sql(sqlText)