From 8b43301cc944ad0af919924018365febdfdbf60d Mon Sep 17 00:00:00 2001 From: Sumedh Wale Date: Fri, 1 Oct 2021 02:51:52 +0530 Subject: [PATCH] Add caching of resolved relations in SnappySessionCatalog - resolving relations especially for cases like external file-based tables having large number of partitions can take a long time due to meta-data gather/process, so added a cache for resolved relations in SnappySessionCatalog - invalidate this cache whenever ExternalCatalog is being invalidated; in addition check for whether the CatalogTable looked up from ExternalCatalog matches the one cached previously and if not then invalidate and re-fetch -- this handles cases where table got invalidated from another session - also add invalidation for the case of inserts into hadoop/hive tables since that will result in new files not present in meta-data and can also result in creation of new partitions - added a dunit test to check the above i.e. correct results after adding new data/partitions from a different session --- aqp | 2 +- ...ColumnBatchAndExternalTableDUnitTest.scala | 51 +++++++++++++++++++ .../spark/sql/SmartConnectorFunctions.scala | 2 +- .../memory/SnappyUnifiedMemoryManager.scala | 7 +-- .../SnappyTableMutableAPISuite.scala | 9 ++-- .../sql/hive/TestHiveSnappySession.scala | 2 +- .../main/scala/io/snappydata/Literals.scala | 4 +- .../apache/spark/sql/CachedDataFrame.scala | 4 +- .../org/apache/spark/sql/SnappySession.scala | 9 ++-- .../sql/execution/CodegenSparkFallback.scala | 4 +- .../columnar/impl/ColumnFormatRelation.scala | 2 +- .../spark/sql/hive/SnappySessionState.scala | 26 ++++++++-- .../sql/internal/ColumnTableBulkOps.scala | 2 +- .../sql/internal/SnappySessionCatalog.scala | 46 +++++++++++++++-- .../apache/spark/sql/internal/session.scala | 4 +- .../apache/spark/sql/sources/interfaces.scala | 4 +- .../snappydata/ColumnUpdateDeleteTests.scala | 2 +- ...SnappyStoreSinkProviderSecuritySuite.scala | 7 +-- 18 files changed, 151 insertions(+), 36 deletions(-) diff --git a/aqp b/aqp index 1c05060ebc..08d73fb23c 160000 --- a/aqp +++ b/aqp @@ -1 +1 @@ -Subproject commit 1c05060ebca9e2ec3773190d5b7fd8c988c1b21a +Subproject commit 08d73fb23c77484e1163bdcde7948d95108ef594 diff --git a/cluster/src/dunit/scala/org/apache/spark/sql/ColumnBatchAndExternalTableDUnitTest.scala b/cluster/src/dunit/scala/org/apache/spark/sql/ColumnBatchAndExternalTableDUnitTest.scala index 39eca56838..b004cd6a05 100644 --- a/cluster/src/dunit/scala/org/apache/spark/sql/ColumnBatchAndExternalTableDUnitTest.scala +++ b/cluster/src/dunit/scala/org/apache/spark/sql/ColumnBatchAndExternalTableDUnitTest.scala @@ -17,11 +17,14 @@ package org.apache.spark.sql +import java.io.File + import com.pivotal.gemfirexd.internal.engine.Misc import io.snappydata.Property import io.snappydata.cluster.ClusterManagerTestBase import io.snappydata.test.dunit.{AvailablePortHelper, SerializableCallable} import io.snappydata.util.TestUtils +import org.apache.commons.io.FileUtils import org.scalatest.Assertions import org.apache.spark.internal.Logging @@ -411,6 +414,54 @@ class ColumnBatchAndExternalTableDUnitTest(s: String) extends ClusterManagerTest assert(expected - max <= TestUtils.defaultCores, s"Lower limit of concurrent tasks = $expected, actual = $max") } + + def testExternalTableMetadataCacheWithInserts(): Unit = { + val dataDir = new File("extern1") + FileUtils.deleteQuietly(dataDir) + dataDir.mkdir() + // create external parquet table and insert some data + val session = new SnappySession(sc) + session.sql("create table extern1 (id long, data string, stat string) using parquet " + + s"options (path '${dataDir.getAbsolutePath}') partitioned by (stat)") + session.sql("insert into extern1 select id, 'data_' || id, 'stat' || (id % 10) " + + "from range(100000)") + + // check results + assert(session.sql("select * from extern1 where stat = 'stat1'").collect().length === 10000) + assert(session.sql("select * from extern1 where stat = 'stat2'").collect().length === 10000) + + // insert more data from another session + val session2 = new SnappySession(sc) + session2.sql("insert into extern1 select id, 'data_' || id, 'stat' || (id % 10) " + + "from range(10000)") + + // check results + assert(session.sql("select * from extern1 where stat = 'stat1'").collect().length === 11000) + assert(session.sql("select * from extern1 where stat = 'stat2'").collect().length === 11000) + assert(session.sql("select * from extern1 where stat = 'stat3'").collect().length === 11000) + assert(session.sql("select * from extern1 where stat = 'stat11'").collect().length === 0) + + // insert more data with new partitions + session2.sql("insert into extern1 select id, 'data_' || id, 'stat' || (id % 20) " + + "from range(10000)") + + // check results + assert(session.sql("select * from extern1 where stat = 'stat1'").collect().length === 11500) + assert(session.sql("select * from extern1 where stat = 'stat2'").collect().length === 11500) + assert(session.sql("select * from extern1 where stat = 'stat3'").collect().length === 11500) + assert(session.sql("select * from extern1 where stat = 'stat11'").collect().length === 500) + + assert(session2.sql("select * from extern1 where stat = 'stat1'").collect().length === 11500) + assert(session2.sql("select * from extern1 where stat = 'stat2'").collect().length === 11500) + assert(session2.sql("select * from extern1 where stat = 'stat3'").collect().length === 11500) + assert(session2.sql("select * from extern1 where stat = 'stat11'").collect().length === 500) + + session.sql("drop table extern1") + session.clear() + session2.clear() + + FileUtils.deleteDirectory(dataDir) + } } case class AirlineData(year: Int, month: Int, dayOfMonth: Int, diff --git a/cluster/src/dunit/scala/org/apache/spark/sql/SmartConnectorFunctions.scala b/cluster/src/dunit/scala/org/apache/spark/sql/SmartConnectorFunctions.scala index 098ebfeaf8..a65f751814 100644 --- a/cluster/src/dunit/scala/org/apache/spark/sql/SmartConnectorFunctions.scala +++ b/cluster/src/dunit/scala/org/apache/spark/sql/SmartConnectorFunctions.scala @@ -76,7 +76,7 @@ object SmartConnectorFunctions { val sc = SparkContext.getOrCreate(conf) val snc = SnappyContext(sc) - snc.snappySession.externalCatalog.invalidateAll() + snc.snappySession.sessionCatalog.invalidateAll() val sqlContext = new SparkSession(sc).sqlContext val pw = new PrintWriter(new FileOutputStream( new File(s"ValidateNWQueries_$tableType.out"), true)) diff --git a/cluster/src/main/scala/org/apache/spark/memory/SnappyUnifiedMemoryManager.scala b/cluster/src/main/scala/org/apache/spark/memory/SnappyUnifiedMemoryManager.scala index ff37f4301a..0931b12a36 100644 --- a/cluster/src/main/scala/org/apache/spark/memory/SnappyUnifiedMemoryManager.scala +++ b/cluster/src/main/scala/org/apache/spark/memory/SnappyUnifiedMemoryManager.scala @@ -793,9 +793,10 @@ class SnappyUnifiedMemoryManager private[memory]( numBytes: Long, memoryMode: MemoryMode): Unit = { // if UMM lock is already held, then release inline else enqueue and be done with it - if (Thread.holdsLock(this) || !pendingStorageMemoryReleases.offer( - (objectName, numBytes, memoryMode), 15, TimeUnit.SECONDS)) { - synchronized(releaseStorageMemoryForObject_(objectName, numBytes, memoryMode)) + if (Thread.holdsLock(this)) synchronized { + releaseStorageMemoryForObject_(objectName, numBytes, memoryMode) + } else { + pendingStorageMemoryReleases.put((objectName, numBytes, memoryMode)) } } diff --git a/cluster/src/test/scala/org/apache/spark/sql/execution/SnappyTableMutableAPISuite.scala b/cluster/src/test/scala/org/apache/spark/sql/execution/SnappyTableMutableAPISuite.scala index 610c3ddbd9..5a1024a5a5 100644 --- a/cluster/src/test/scala/org/apache/spark/sql/execution/SnappyTableMutableAPISuite.scala +++ b/cluster/src/test/scala/org/apache/spark/sql/execution/SnappyTableMutableAPISuite.scala @@ -824,7 +824,8 @@ class SnappyTableMutableAPISuite extends SnappyFunSuite with Logging with Before val message = intercept[AnalysisException] { df2.write.deleteFrom("col_table") }.getMessage - assert(message.contains("column `pk3` cannot be resolved on the right side of the operation.")) + assert(message.contains("column `pk3` cannot be resolved on the right side of the operation.") + || message.contains("WHERE clause of the DELETE FROM statement must have all the key")) } test("Bug - SNAP-2157") { @@ -908,7 +909,8 @@ class SnappyTableMutableAPISuite extends SnappyFunSuite with Logging with Before }.getMessage assert(message.contains("DeleteFrom operation requires " + - "key columns(s) or primary key defined on table.")) + "key columns(s) or primary key defined on table.") || + message.contains("WHERE clause of the DELETE FROM statement must have all the key")) } @@ -930,7 +932,8 @@ class SnappyTableMutableAPISuite extends SnappyFunSuite with Logging with Before df2.write.deleteFrom("row_table") }.getMessage - assert(message.contains("column `pk3` cannot be resolved on the right side of the operation.")) + assert(message.contains("column `pk3` cannot be resolved on the right side of the operation.") + || message.contains("WHERE clause of the DELETE FROM statement must have all the key")) } test("Delete From SQL using JDBC: row tables") { diff --git a/compatibilityTests/src/test/scala/org/apache/spark/sql/hive/TestHiveSnappySession.scala b/compatibilityTests/src/test/scala/org/apache/spark/sql/hive/TestHiveSnappySession.scala index 61bed2d283..f859dcea14 100644 --- a/compatibilityTests/src/test/scala/org/apache/spark/sql/hive/TestHiveSnappySession.scala +++ b/compatibilityTests/src/test/scala/org/apache/spark/sql/hive/TestHiveSnappySession.scala @@ -72,7 +72,7 @@ class TestHiveSnappySession(@transient protected val sc: SparkContext, sharedState.cacheManager.clearCache() loadedTables.clear() sessionCatalog.clearTempTables() - sessionCatalog.externalCatalog.invalidateAll() + sessionCatalog.invalidateAll() FunctionRegistry.getFunctionNames.asScala.filterNot(originalUDFs.contains(_)). foreach { udfName => FunctionRegistry.unregisterTemporaryUDF(udfName) } diff --git a/core/src/main/scala/io/snappydata/Literals.scala b/core/src/main/scala/io/snappydata/Literals.scala index 7cd9b6a420..b5db733630 100644 --- a/core/src/main/scala/io/snappydata/Literals.scala +++ b/core/src/main/scala/io/snappydata/Literals.scala @@ -168,9 +168,9 @@ object Property extends Enumeration { "(value in bytes or k/m/g suffixes for unit, min 1k). Default is 4MB.", Some("4m")) val ResultPersistenceTimeout: SparkValue[Long] = Val[Long]( - s"${Constant.SPARK_PREFIX}sql.ResultPersistenceTimeout", + s"${Constant.SPARK_PREFIX}sql.resultPersistenceTimeout", s"Maximum duration in seconds for which results larger than ${MaxMemoryResultSize.name}" + - s"are held on disk after which they are cleaned up. Default is 3600s (1h).", Some(3600L)) + s"are held on disk after which they are cleaned up. Default is 7200 (2h).", Some(7200L)) val DisableHashJoin: SQLValue[Boolean] = SQLVal[Boolean]( s"${Constant.PROPERTY_PREFIX}sql.disableHashJoin", diff --git a/core/src/main/scala/org/apache/spark/sql/CachedDataFrame.scala b/core/src/main/scala/org/apache/spark/sql/CachedDataFrame.scala index 763f99d8d0..222ec5caad 100644 --- a/core/src/main/scala/org/apache/spark/sql/CachedDataFrame.scala +++ b/core/src/main/scala/org/apache/spark/sql/CachedDataFrame.scala @@ -415,7 +415,7 @@ class CachedDataFrame(snappySession: SnappySession, queryExecution: QueryExecuti } catch { case t: Throwable if CachedDataFrame.isConnectorCatalogStaleException(t, snappySession) => - snappySession.externalCatalog.invalidateAll() + snappySession.sessionCatalog.invalidateAll() SnappySession.clearAllCache() val execution = snappySession.getContextObject[() => QueryExecution](SnappySession.ExecutionKey) @@ -1005,7 +1005,7 @@ object CachedDataFrame } catch { case t: Throwable if CachedDataFrame.isConnectorCatalogStaleException(t, snappySession) => - snappySession.externalCatalog.invalidateAll() + snappySession.sessionCatalog.invalidateAll() SnappySession.clearAllCache() if (attempts < retryCount) { Thread.sleep(attempts*100) diff --git a/core/src/main/scala/org/apache/spark/sql/SnappySession.scala b/core/src/main/scala/org/apache/spark/sql/SnappySession.scala index ffcd981f9a..ae173c4416 100644 --- a/core/src/main/scala/org/apache/spark/sql/SnappySession.scala +++ b/core/src/main/scala/org/apache/spark/sql/SnappySession.scala @@ -245,12 +245,11 @@ class SnappySession(_sc: SparkContext) extends SparkSession(_sc) { val relations = plan.collect { case _: Command => hasCommand = true; null case u: UnresolvedRelation => - val tableIdent = sessionCatalog.resolveTableIdentifier(u.tableIdentifier) - tableIdent.database.get -> tableIdent.table + sessionCatalog.resolveTableIdentifier(u.tableIdentifier) } - if (hasCommand) externalCatalog.invalidateAll() + if (hasCommand) sessionCatalog.invalidateAll() else if (relations.nonEmpty) { - relations.foreach(externalCatalog.invalidate) + relations.foreach(sessionCatalog.invalidate(_)) } throw e case _ => @@ -332,6 +331,7 @@ class SnappySession(_sc: SparkContext) extends SparkSession(_sc) { @transient private var sqlWarnings: SQLWarning = _ + private[sql] var catalogInitialized: Boolean = _ private[sql] var hiveInitializing: Boolean = _ private[sql] def isHiveSupportEnabled(v: String): Boolean = Utils.toLowerCase(v) match { @@ -1700,6 +1700,7 @@ class SnappySession(_sc: SparkContext) extends SparkSession(_sc) { plan match { case LogicalRelation(rls: RowLevelSecurityRelation, _, _) => rls.enableOrDisableRowLevelSecurity(tableIdent, enableRls) + sessionCatalog.invalidate(tableIdent) externalCatalog.invalidateCaches(tableIdent.database.get -> tableIdent.table :: Nil) case _ => throw new AnalysisException("ALTER TABLE enable/disable Row Level Security " + diff --git a/core/src/main/scala/org/apache/spark/sql/execution/CodegenSparkFallback.scala b/core/src/main/scala/org/apache/spark/sql/execution/CodegenSparkFallback.scala index 3892040e24..aea3078afd 100644 --- a/core/src/main/scala/org/apache/spark/sql/execution/CodegenSparkFallback.scala +++ b/core/src/main/scala/org/apache/spark/sql/execution/CodegenSparkFallback.scala @@ -110,7 +110,7 @@ case class CodegenSparkFallback(var child: SparkPlan, result } catch { case t: Throwable if CachedDataFrame.isConnectorCatalogStaleException(t, session) => - session.externalCatalog.invalidateAll() + session.sessionCatalog.invalidateAll() SnappySession.clearAllCache() throw CachedDataFrame.catalogStaleFailure(t, session) } finally { @@ -125,7 +125,7 @@ case class CodegenSparkFallback(var child: SparkPlan, } private def handleStaleCatalogException[T](f: SparkPlan => T, plan: SparkPlan, t: Throwable) = { - session.externalCatalog.invalidateAll() + session.sessionCatalog.invalidateAll() SnappySession.clearAllCache() // fail immediate for insert/update/delete, else retry entire query val action = plan.find { diff --git a/core/src/main/scala/org/apache/spark/sql/execution/columnar/impl/ColumnFormatRelation.scala b/core/src/main/scala/org/apache/spark/sql/execution/columnar/impl/ColumnFormatRelation.scala index c99fabc043..ec4e1645e9 100644 --- a/core/src/main/scala/org/apache/spark/sql/execution/columnar/impl/ColumnFormatRelation.scala +++ b/core/src/main/scala/org/apache/spark/sql/execution/columnar/impl/ColumnFormatRelation.scala @@ -436,7 +436,7 @@ abstract class BaseColumnFormatRelation( createExternalTableForColumnBatches(externalColumnTableName, conn) // store schema will miss complex types etc, so use the user-provided one val session = sqlContext.sparkSession.asInstanceOf[SnappySession] - session.externalCatalog.invalidate(schemaName -> tableName) + session.sessionCatalog.invalidate(TableIdentifier(tableName, Some(schemaName))) _schema = userSchema _relationInfoAndRegion = null } diff --git a/core/src/main/scala/org/apache/spark/sql/hive/SnappySessionState.scala b/core/src/main/scala/org/apache/spark/sql/hive/SnappySessionState.scala index 95b72aca42..c39c7d8274 100644 --- a/core/src/main/scala/org/apache/spark/sql/hive/SnappySessionState.scala +++ b/core/src/main/scala/org/apache/spark/sql/hive/SnappySessionState.scala @@ -674,7 +674,7 @@ class SnappySessionState(val snappySession: SnappySession) * Internal catalog for managing table and database states. */ override lazy val catalog: SnappySessionCatalog = { - new SnappySessionCatalog( + val sessionCatalog = new SnappySessionCatalog( snappySession.sharedState.getExternalCatalogInstance(snappySession), snappySession, snappySession.sharedState.globalTempViewManager, @@ -682,6 +682,8 @@ class SnappySessionState(val snappySession: SnappySession) functionRegistry, conf, newHadoopConf()) + snappySession.catalogInitialized = true + sessionCatalog } protected lazy val wrapperCatalog: SessionCatalogWrapper = { @@ -701,7 +703,7 @@ class SnappySessionState(val snappySession: SnappySession) python.ExtractPythonUDFs, TokenizeSubqueries(snappySession), EnsureRequirements(conf), - OptimizeSortAndFilePlans(conf), + OptimizeSortAndFilePlans(snappySession), CollapseCollocatedPlans(snappySession), CollapseCodegenStages(conf), InsertCachedPlanFallback(snappySession, topLevel), @@ -916,14 +918,28 @@ class SnappyAnalyzer(sessionState: SnappySessionState) * Rule to replace Spark's SortExec plans with an optimized SnappySortExec (in SMJ for now). * Also sets the "spark.task.cpus" property implicitly for file scans/writes. */ -case class OptimizeSortAndFilePlans(conf: SnappyConf) extends Rule[SparkPlan] { +case class OptimizeSortAndFilePlans(session: SnappySession) extends Rule[SparkPlan] { override def apply(plan: SparkPlan): SparkPlan = plan.transformUp { case join@joins.SortMergeJoinExec(_, _, _, _, _, sort@SortExec(_, _, child, _)) => join.copy(right = SnappySortExec(sort, child)) - case s@(_: FileSourceScanExec | _: HiveTableScanExec | _: InsertIntoHiveTable | + case i: InsertIntoHiveTable => + val table = i.table.catalogTable + // invalidate meta-data since that can change after the insert + val tableIdentifier = session.sessionCatalog.resolveTableIdentifier(table.identifier) + session.externalCatalog.invalidate(tableIdentifier.database.get -> tableIdentifier.table) + session.sessionState.conf.setDynamicCpusPerTask() + i + case c@ExecutedCommandExec(i: InsertIntoHadoopFsRelationCommand) if i.catalogTable.isDefined => + val table = i.catalogTable.get + // invalidate meta-data since that can change after the insert + val tableIdentifier = session.sessionCatalog.resolveTableIdentifier(table.identifier) + session.externalCatalog.invalidate(tableIdentifier.database.get -> tableIdentifier.table) + session.sessionState.conf.setDynamicCpusPerTask() + c + case s@(_: FileSourceScanExec | _: HiveTableScanExec | ExecutedCommandExec(_: InsertIntoHadoopFsRelationCommand | _: CreateHiveTableAsSelectCommand)) => - conf.setDynamicCpusPerTask() + session.sessionState.conf.setDynamicCpusPerTask() s } } diff --git a/core/src/main/scala/org/apache/spark/sql/internal/ColumnTableBulkOps.scala b/core/src/main/scala/org/apache/spark/sql/internal/ColumnTableBulkOps.scala index a01147a37e..4bbf349e8a 100644 --- a/core/src/main/scala/org/apache/spark/sql/internal/ColumnTableBulkOps.scala +++ b/core/src/main/scala/org/apache/spark/sql/internal/ColumnTableBulkOps.scala @@ -121,7 +121,7 @@ object ColumnTableBulkOps { result } catch { case t: Throwable if CachedDataFrame.isConnectorCatalogStaleException(t, session) => - session.externalCatalog.invalidateAll() + session.sessionCatalog.invalidateAll() SnappySession.clearAllCache() // throw failure immediately to keep it consistent with insert/update/delete throw CachedDataFrame.catalogStaleFailure(t, session) diff --git a/core/src/main/scala/org/apache/spark/sql/internal/SnappySessionCatalog.scala b/core/src/main/scala/org/apache/spark/sql/internal/SnappySessionCatalog.scala index 8fe11dbce2..8f93a25dc2 100644 --- a/core/src/main/scala/org/apache/spark/sql/internal/SnappySessionCatalog.scala +++ b/core/src/main/scala/org/apache/spark/sql/internal/SnappySessionCatalog.scala @@ -23,12 +23,13 @@ import java.sql.SQLException import scala.util.control.NonFatal import com.gemstone.gemfire.SystemFailure +import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache} import com.pivotal.gemfirexd.Attribute import com.pivotal.gemfirexd.internal.iapi.util.IdUtil import io.snappydata.Constant import io.snappydata.sql.catalog.CatalogObjectType.getTableType import io.snappydata.sql.catalog.SnappyExternalCatalog.{DBTABLE_PROPERTY, getTableWithSchema} -import io.snappydata.sql.catalog.{CatalogObjectType, SnappyExternalCatalog} +import io.snappydata.sql.catalog.{CatalogObjectType, ConnectorExternalCatalog, SnappyExternalCatalog} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path @@ -96,6 +97,19 @@ class SnappySessionCatalog(val externalCatalog: SnappyExternalCatalog, defaultName } + /** A cache of Spark SQL data source tables that have been accessed. */ + // noinspection UnstableApiUsage + protected[sql] val cachedDataSourceTables: LoadingCache[TableIdentifier, LogicalPlan] = { + val loader = new CacheLoader[TableIdentifier, LogicalPlan]() { + override def load(tableName: TableIdentifier): LogicalPlan = { + logDebug(s"Creating new cached data source for $tableName") + val table = externalCatalog.getTable(tableName.database.get, tableName.table) + new FindDataSourceTable(snappySession)(SimpleCatalogRelation(table.database, table)) + } + } + CacheBuilder.newBuilder().maximumSize(ConnectorExternalCatalog.cacheSize >> 2).build(loader) + } + final def getCurrentSchema: String = getCurrentDatabase /** @@ -270,7 +284,11 @@ class SnappySessionCatalog(val externalCatalog: SnappyExternalCatalog, final def resolveRelationWithAlias(tableIdent: TableIdentifier, alias: Option[String] = None): LogicalPlan = { // resolve the relation right away with alias around - new FindDataSourceTable(snappySession)(lookupRelation(tableIdent, alias)) + lookupRelation(tableIdent, alias) match { + case lr: LogicalRelation => lr + case a: SubqueryAlias if a.child.isInstanceOf[LogicalRelation] => a + case r => new FindDataSourceTable(snappySession)(r) + } } /** @@ -878,7 +896,17 @@ class SnappySessionCatalog(val externalCatalog: SnappyExternalCatalog, getPolicyPlan(table) } else { view = None - SimpleCatalogRelation(schemaName, table) + if (DDLUtils.isDatasourceTable(table)) { + val resolved = TableIdentifier(tableName, Some(schemaName)) + cachedDataSourceTables(resolved) match { + case lr: LogicalRelation + if lr.catalogTable.isDefined && (lr.catalogTable.get ne table) => + // refresh since table metadata has changed + cachedDataSourceTables.invalidate(resolved) + cachedDataSourceTables(resolved) + case p => p + } + } else SimpleCatalogRelation(schemaName, table) } } case Some(p) => p @@ -914,13 +942,23 @@ class SnappySessionCatalog(val externalCatalog: SnappyExternalCatalog, super.refreshTable(table) } else { val resolved = resolveTableIdentifier(table) - externalCatalog.invalidate(resolved.database.get -> resolved.table) + invalidate(resolved) if (snappySession.enableHiveSupport) { hiveSessionCatalog.refreshTable(resolved) } } } + def invalidate(resolved: TableIdentifier, sessionOnly: Boolean = false): Unit = { + cachedDataSourceTables.invalidate(resolved) + if (!sessionOnly) externalCatalog.invalidate(resolved.database.get -> resolved.table) + } + + def invalidateAll(sessionOnly: Boolean = false): Unit = { + cachedDataSourceTables.invalidateAll() + if (!sessionOnly) externalCatalog.invalidateAll() + } + def getDataSourceRelations[T](tableType: CatalogObjectType.Type): Seq[T] = { externalCatalog.getAllTables().collect { case table if tableType == CatalogObjectType.getTableType(table) => diff --git a/core/src/main/scala/org/apache/spark/sql/internal/session.scala b/core/src/main/scala/org/apache/spark/sql/internal/session.scala index 6086eaa506..0b44005b24 100644 --- a/core/src/main/scala/org/apache/spark/sql/internal/session.scala +++ b/core/src/main/scala/org/apache/spark/sql/internal/session.scala @@ -239,7 +239,9 @@ class SnappyConf(@transient val session: SnappySession) key } - case GAttr.USERNAME_ATTR | GAttr.USERNAME_ALT_ATTR | GAttr.PASSWORD_ATTR => key + case GAttr.USERNAME_ATTR | GAttr.USERNAME_ALT_ATTR | GAttr.PASSWORD_ATTR => + if (session.catalogInitialized) session.sessionCatalog.invalidateAll(sessionOnly = true) + key case _ if key.startsWith("spark.sql.aqp.") => session.clearPlanCache() diff --git a/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index ca2757cb1a..cddce72651 100644 --- a/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -422,7 +422,9 @@ trait NativeTableRowLevelSecurityRelation // If the schema has changed (in smart connector) then execution should throw an exception // leading to a retry or a CatalogStaleException to fail the operation. val session = sqlContext.sparkSession.asInstanceOf[SnappySession] - if (invalidateCached) session.externalCatalog.invalidate(schemaName -> tableName) + if (invalidateCached) { + session.sessionCatalog.invalidate(TableIdentifier(tableName, Some(schemaName))) + } _relationInfoAndRegion = null if (fetchFromStore) { _schema = JdbcExtendedUtils.normalizeSchema(JDBCRDD.resolveTable(new JDBCOptions( diff --git a/core/src/test/scala/io/snappydata/ColumnUpdateDeleteTests.scala b/core/src/test/scala/io/snappydata/ColumnUpdateDeleteTests.scala index cf82b66a6e..c7c81d5e97 100644 --- a/core/src/test/scala/io/snappydata/ColumnUpdateDeleteTests.scala +++ b/core/src/test/scala/io/snappydata/ColumnUpdateDeleteTests.scala @@ -639,7 +639,7 @@ object ColumnUpdateDeleteTests extends Assertions with Logging { Await.result(runNonPartitioned, Duration.Inf) } - def testConcurrentUpdateDeleteForCompactionBody(session: SnappySession, + private def testConcurrentUpdateDeleteForCompactionBody(session: SnappySession, partitioned: Boolean, numRows: Int, barrier: CyclicBarrier): Unit = { // General idea is to have multiple threads perform deletes and updates stepping through // a range of values (different for each thread) to finally delete all data in the table diff --git a/core/src/test/scala/org/apache/spark/sql/streaming/SnappyStoreSinkProviderSecuritySuite.scala b/core/src/test/scala/org/apache/spark/sql/streaming/SnappyStoreSinkProviderSecuritySuite.scala index f6cde87644..b6534f5097 100644 --- a/core/src/test/scala/org/apache/spark/sql/streaming/SnappyStoreSinkProviderSecuritySuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/streaming/SnappyStoreSinkProviderSecuritySuite.scala @@ -21,6 +21,7 @@ import java.util.concurrent.atomic.AtomicInteger import scala.reflect.io.Path +import com.gemstone.gemfire.internal.cache.GemFireCacheImpl import com.pivotal.gemfirexd.Attribute import com.pivotal.gemfirexd.security.{LdapTestServer, SecurityTestUtils} import io.snappydata.{Constant, SnappyFunSuite} @@ -49,8 +50,8 @@ class SnappyStoreSinkProviderSecuritySuite extends SnappyFunSuite private def getTopic(id: Int) = s"topic-$id" - override def beforeAll() { - super.beforeAll() + override def beforeAll(): Unit = { + if (GemFireCacheImpl.getInstance ne null) super.beforeAll() this.stopAll() kafkaTestUtils = new KafkaTestUtils kafkaTestUtils.setup() @@ -61,7 +62,7 @@ class SnappyStoreSinkProviderSecuritySuite extends SnappyFunSuite session.sql(s"CREATE SCHEMA $ldapGroup AUTHORIZATION ldapgroup:$ldapGroup;") } - override def afterAll() { + override def afterAll(): Unit = { // setup session credentials for cleanup val session = this.snc.snappySession session.conf.set(Attribute.USERNAME_ATTR, sysUser)