Skip to content

Commit

Permalink
added support to revert changes if query fails
Browse files Browse the repository at this point in the history
  • Loading branch information
kunal642 authored and ravipesala committed Apr 6, 2017
1 parent bbade2a commit 2a4f09b
Show file tree
Hide file tree
Showing 8 changed files with 303 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -404,8 +404,10 @@ private BucketingInfo fromExternalToWarpperBucketingInfo(
org.apache.carbondata.format.TableInfo externalTableInfo, String dbName, String tableName,
String storePath) {
TableInfo wrapperTableInfo = new TableInfo();
List<org.apache.carbondata.format.SchemaEvolutionEntry> schemaEvolutionList =
externalTableInfo.getFact_table().getSchema_evolution().getSchema_evolution_history();
wrapperTableInfo.setLastUpdatedTime(
externalTableInfo.getFact_table().getSchema_evolution().getSchema_evolution_history().get(0)
schemaEvolutionList.get(schemaEvolutionList.size() - 1)
.getTime_stamp());
wrapperTableInfo.setDatabaseName(dbName);
wrapperTableInfo.setTableUniqueName(dbName + "_" + tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -573,10 +573,10 @@ public static int compareDoubleWithNan(Double d1, Double d2) {
* Below method will be used to convert the data into byte[]
*
* @param data
* @param actualDataType actual data type
* @param ColumnSchema
* @return actual data in byte[]
*/
public static byte[] convertDataToBytesBasedOnDataType(String data, DataType actualDataType) {
public static byte[] convertDataToBytesBasedOnDataType(String data, ColumnSchema columnSchema) {
if (null == data) {
return null;
} else if (CarbonCommonConstants.MEMBER_DEFAULT_VAL.equals(data)) {
Expand All @@ -585,7 +585,7 @@ public static byte[] convertDataToBytesBasedOnDataType(String data, DataType act
}
try {
long parsedIntVal = 0;
switch (actualDataType) {
switch (columnSchema.getDataType()) {
case INT:
parsedIntVal = (long) Integer.parseInt(data);
return String.valueOf(parsedIntVal)
Expand All @@ -602,13 +602,17 @@ public static byte[] convertDataToBytesBasedOnDataType(String data, DataType act
.getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
case DATE:
case TIMESTAMP:
DirectDictionaryGenerator directDictionaryGenerator =
DirectDictionaryKeyGeneratorFactory.getDirectDictionaryGenerator(actualDataType);
DirectDictionaryGenerator directDictionaryGenerator = DirectDictionaryKeyGeneratorFactory
.getDirectDictionaryGenerator(columnSchema.getDataType());
int value = directDictionaryGenerator.generateDirectSurrogateKey(data);
return String.valueOf(value)
.getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
case DECIMAL:
java.math.BigDecimal javaDecVal = new java.math.BigDecimal(data);
String parsedValue = parseStringToBigDecimal(data, columnSchema);
if (null == parsedValue) {
return null;
}
java.math.BigDecimal javaDecVal = new java.math.BigDecimal(parsedValue);
return bigDecimalToByte(javaDecVal);
default:
return UTF8String.fromString(data).getBytes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ class AlterTableProcessor(
if (elem._1.toLowerCase.startsWith(defaultValueString)) {
if (col.getColumnName.equalsIgnoreCase(elem._1.substring(defaultValueString.length))) {
rawData = elem._2
val data = DataTypeUtil.convertDataToBytesBasedOnDataType(elem._2, col.getDataType)
val data = DataTypeUtil.convertDataToBytesBasedOnDataType(elem._2, col)
if (null != data) {
col.setDefaultValue(data)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverte
import org.apache.carbondata.core.metadata.encoder.Encoding
import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.core.util.path.CarbonStorePath
import org.apache.carbondata.format.{ColumnSchema, SchemaEvolutionEntry, TableInfo}
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
Expand All @@ -56,6 +57,7 @@ private[sql] case class AlterTableAddColumns(
// get the latest carbon table and check for column existence
val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
var newCols = Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]()
val lastUpdatedTime = carbonTable.getTableLastUpdatedTime
try {
// read the latest schema file
val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
Expand Down Expand Up @@ -90,16 +92,16 @@ private[sql] case class AlterTableAddColumns(
LOGGER.info(s"Alter table for add columns is successful for table $dbName.$tableName")
LOGGER.audit(s"Alter table for add columns is successful for table $dbName.$tableName")
} catch {
case e: Exception =>
LOGGER.error(e, s"Alter table add columns failed : ${e.getMessage}")
// clean up the dictionary files in case of any failure
case e: Exception => LOGGER
.error("Alter table add columns failed :" + e.getMessage)
if (!newCols.isEmpty) {
LOGGER.info("Cleaning up the dictionary files as alter table add operation failed")
new AlterTableDropColumnRDD(sparkSession.sparkContext,
newCols,
carbonTable.getCarbonTableIdentifier,
carbonTable.getStorePath).collect()
}
AlterTableUtil.revertAddColumnChanges(dbName, tableName, lastUpdatedTime)(sparkSession)
sys.error("Alter table add column operation failed. Please check the logs")
} finally {
// release lock after command execution completion
Expand Down Expand Up @@ -151,6 +153,7 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableR
.validateTableAndAcquireLock(oldDatabaseName, oldTableName, locksToBeAcquired, LOGGER)(
sparkSession)
val carbonTable = relation.tableMeta.carbonTable
val lastUpdatedTime = carbonTable.getTableLastUpdatedTime
try {
// get the latest carbon table and check for column existence
val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
Expand All @@ -160,15 +163,16 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableR
.readSchemaFile(tableMetadataFile)
val schemaEvolutionEntry = new SchemaEvolutionEntry(System.currentTimeMillis)
schemaEvolutionEntry.setTableName(newTableName)
schemaEvolutionEntry.setTime_stamp(System.currentTimeMillis())
renameBadRecords(oldTableName, newTableName, oldDatabaseName)
val fileType = FileFactory.getFileType(tableMetadataFile)
if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
val rename = FileFactory.getCarbonFile(carbonTablePath.getPath, fileType)
.renameForce(carbonTablePath.getParent.toString + CarbonCommonConstants.FILE_SEPARATOR +
newTableName)
if (!rename) {
sys.error(s"Folder rename failed for table $oldDatabaseName.$oldTableName")
renameBadRecords(newTableName, oldTableName, oldDatabaseName)
sys.error(s"Folder rename failed for table $oldDatabaseName.$oldTableName")
}
}
val newTableIdentifier = new CarbonTableIdentifier(oldDatabaseName,
Expand All @@ -190,8 +194,11 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableR
LOGGER.audit(s"Table $oldTableName has been successfully renamed to $newTableName")
LOGGER.info(s"Table $oldTableName has been successfully renamed to $newTableName")
} catch {
case e: Exception =>
LOGGER.error(e, s"Rename table failed: ${e.getMessage}")
case e: Exception => LOGGER
.error("Rename table failed: " + e.getMessage)
AlterTableUtil.revertRenameTableChanges(oldTableIdentifier, newTableName, lastUpdatedTime)(
sparkSession)
renameBadRecords(newTableName, oldTableName, oldDatabaseName)
sys.error("Alter table rename table operation failed. Please check the logs")
} finally {
// release lock after command execution completion
Expand Down Expand Up @@ -240,9 +247,10 @@ private[sql] case class AlterTableDropColumns(
val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK)
val locks = AlterTableUtil
.validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired, LOGGER)(sparkSession)
// get the latest carbon table and check for column existence
val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
val lastUpdatedTime = carbonTable.getTableLastUpdatedTime
try {
// get the latest carbon table and check for column existence
val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
// check each column existence in the table
val tableColumns = carbonTable.getCreateOrderColumn(tableName).asScala
var dictionaryColumns = Seq[org.apache.carbondata.core.metadata.schema.table.column
Expand All @@ -269,12 +277,8 @@ private[sql] case class AlterTableDropColumns(
}
// take the total key column count. key column to be deleted should not
// be >= key columns in schema
var totalKeyColumnInSchema = 0
tableColumns.foreach { tableColumn =>
// column should not be already deleted and should exist in the table
if (!tableColumn.isInvisible && tableColumn.isDimesion) {
totalKeyColumnInSchema += 1
}
val totalKeyColumnInSchema = tableColumns.count {
tableColumn => !tableColumn.isInvisible && tableColumn.isDimesion
}
if (keyColumnCountToBeDeleted >= totalKeyColumnInSchema) {
sys.error(s"Alter drop operation failed. AtLeast one key column should exist after drop.")
Expand Down Expand Up @@ -315,8 +319,9 @@ private[sql] case class AlterTableDropColumns(
LOGGER.info(s"Alter table for drop columns is successful for table $dbName.$tableName")
LOGGER.audit(s"Alter table for drop columns is successful for table $dbName.$tableName")
} catch {
case e: Exception =>
LOGGER.error(e, s"Alter table drop columns failed : ${e.getMessage}")
case e: Exception => LOGGER
.error("Alter table drop columns failed : " + e.getMessage)
AlterTableUtil.revertDropColumnChanges(dbName, tableName, lastUpdatedTime)(sparkSession)
sys.error("Alter table drop column operation failed. Please check the logs")
} finally {
// release lock after command execution completion
Expand All @@ -339,11 +344,11 @@ private[sql] case class AlterTableDataTypeChange(
val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK)
val locks = AlterTableUtil
.validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired, LOGGER)(sparkSession)
// get the latest carbon table and check for column existence
val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
val lastUpdatedTime = carbonTable.getTableLastUpdatedTime
try {
// get the latest carbon table and check for column existence
val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
val columnName = alterTableDataTypeChangeModel.columnName
var carbonColumnToBeModified: CarbonColumn = null
val carbonColumns = carbonTable.getCreateOrderColumn(tableName).asScala.filter(!_.isInvisible)

if (!carbonColumns.exists(_.getColName.equalsIgnoreCase(columnName))) {
Expand Down Expand Up @@ -393,8 +398,9 @@ private[sql] case class AlterTableDataTypeChange(
LOGGER.info(s"Alter table for data type change is successful for table $dbName.$tableName")
LOGGER.audit(s"Alter table for data type change is successful for table $dbName.$tableName")
} catch {
case e: Exception =>
LOGGER.error(e, s"Alter table change datatype failed : ${e.getMessage}")
case e: Exception => LOGGER
.error("Alter table change datatype failed : " + e.getMessage)
AlterTableUtil.revertDataTypeChanges(dbName, tableName, lastUpdatedTime)(sparkSession)
sys.error("Alter table data type change operation failed. Please check the logs")
} finally {
// release lock after command execution completion
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ class CarbonMetastore(conf: RuntimeConfig, val storePath: String) {
}

/**
* This method will overwrite the existing schema and update it with the gievn details
* This method will overwrite the existing schema and update it with the given details
*
* @param carbonTableIdentifier
* @param thriftTableInfo
Expand All @@ -327,6 +327,34 @@ class CarbonMetastore(conf: RuntimeConfig, val storePath: String) {
carbonTableIdentifier.getTableName)(sparkSession)
}

/**
* This method will is used to remove the evolution entry in case of failure.
*
* @param carbonTableIdentifier
* @param thriftTableInfo
* @param carbonStorePath
* @param sparkSession
*/
def revertTableSchema(carbonTableIdentifier: CarbonTableIdentifier,
thriftTableInfo: org.apache.carbondata.format.TableInfo,
carbonStorePath: String)
(sparkSession: SparkSession): String = {
val schemaConverter = new ThriftWrapperSchemaConverterImpl
val wrapperTableInfo = schemaConverter
.fromExternalToWrapperTableInfo(thriftTableInfo,
carbonTableIdentifier.getDatabaseName,
carbonTableIdentifier.getTableName,
carbonStorePath)
val evolutionEntries = thriftTableInfo.fact_table.schema_evolution.schema_evolution_history
evolutionEntries.remove(evolutionEntries.size() - 1)
createSchemaThriftFile(wrapperTableInfo,
thriftTableInfo,
carbonTableIdentifier.getDatabaseName,
carbonTableIdentifier.getTableName)(sparkSession)
}



/**
*
* Prepare Thrift Schema from wrapper TableInfo and write to Schema file.
Expand Down
Loading

0 comments on commit 2a4f09b

Please sign in to comment.