@@ -20,12 +20,12 @@ package org.apache.spark.sql.catalyst.catalog
2020import java .net .URI
2121import java .util .Locale
2222import java .util .concurrent .Callable
23- import javax .annotation .concurrent .GuardedBy
2423
2524import scala .collection .mutable
2625import scala .util .{Failure , Success , Try }
2726
2827import com .google .common .cache .{Cache , CacheBuilder }
28+ import javax .annotation .concurrent .GuardedBy
2929import org .apache .hadoop .conf .Configuration
3030import org .apache .hadoop .fs .Path
3131
@@ -41,6 +41,7 @@ import org.apache.spark.sql.catalyst.util.StringUtils
4141import org .apache .spark .sql .internal .SQLConf
4242import org .apache .spark .sql .types .{StructField , StructType }
4343
44+
4445object SessionCatalog {
4546 val DEFAULT_DATABASE = " default"
4647}
@@ -60,8 +61,8 @@ class SessionCatalog(
6061 hadoopConf : Configuration ,
6162 parser : ParserInterface ,
6263 functionResourceLoader : FunctionResourceLoader ) extends Logging {
63- import SessionCatalog ._
6464 import CatalogTypes .TablePartitionSpec
65+ import SessionCatalog ._
6566
6667 // For testing only.
6768 def this (
@@ -666,7 +667,28 @@ class SessionCatalog(
666667 } else if (name.database.isDefined || ! tempTables.contains(table)) {
667668 val tableNamePreprocessor = externalCatalog.getTableNamePreprocessor
668669 val tableNameInMetastore = tableNamePreprocessor(table)
669- val metadata = externalCatalog.getTable(db, tableNameInMetastore).withTableName(table)
670+
671+ // if the table name is of CSD's proprietary form, we remove version partitioning
672+ // information so that our custom hdfs file/dir selector path can be triggered.
673+ // this is to help CSD to transition to partition version scheme and maintain
674+ // backward compatibility for old queries
675+ val metadataLookup = externalCatalog.getTable(db, tableNameInMetastore).withTableName(table)
676+ val metadata = if (! tableNameInMetastore.equalsIgnoreCase(table) &&
677+ metadataLookup.partitionColumnNames.exists(_.equalsIgnoreCase(" version" ))) {
678+ metadataLookup.copy(
679+ partitionColumnNames = metadataLookup.partitionColumnNames.filter{ s =>
680+ ! s.equalsIgnoreCase(" version" )
681+ },
682+ schema = metadataLookup.schema.copy(
683+ fields = metadataLookup.schema.fields.filter { s =>
684+ ! s.name.equalsIgnoreCase(" version" )
685+ }
686+ )
687+ )
688+ } else {
689+ metadataLookup
690+ }
691+
670692 if (metadata.tableType == CatalogTableType .VIEW ) {
671693 val viewText = metadata.viewText.getOrElse(sys.error(" Invalid view without text." ))
672694 // The relation is a view, so we wrap the relation by:
0 commit comments