Skip to content

Commit

Permalink
Add caching of resolved relations in SnappySessionCatalog
Browse files Browse the repository at this point in the history
- resolving relations especially for cases like external file-based tables having large
  number of partitions can take a long time due to meta-data gather/process, so added a
  cache for resolved relations in SnappySessionCatalog
- invalidate this cache whenever ExternalCatalog is being invalidated; in addition check for
  whether the CatalogTable looked up from ExternalCatalog matches the one cached previously
  and if not then invalidate and re-fetch -- this handles cases where table got invalidated
  from another session
- also add invalidation for the case of inserts into hadoop/hive tables since that will
  result in new files not present in meta-data and can also result in creation of new
  partitions
- added a dunit test to check the above i.e. correct results after adding new
  data/partitions from a different session
  • Loading branch information
sumwale committed Oct 1, 2021
1 parent 97078ae commit 8b43301
Show file tree
Hide file tree
Showing 18 changed files with 151 additions and 36 deletions.
2 changes: 1 addition & 1 deletion aqp
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@
package org.apache.spark.sql


import java.io.File

import com.pivotal.gemfirexd.internal.engine.Misc
import io.snappydata.Property
import io.snappydata.cluster.ClusterManagerTestBase
import io.snappydata.test.dunit.{AvailablePortHelper, SerializableCallable}
import io.snappydata.util.TestUtils
import org.apache.commons.io.FileUtils
import org.scalatest.Assertions

import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -411,6 +414,54 @@ class ColumnBatchAndExternalTableDUnitTest(s: String) extends ClusterManagerTest
assert(expected - max <= TestUtils.defaultCores,
s"Lower limit of concurrent tasks = $expected, actual = $max")
}

def testExternalTableMetadataCacheWithInserts(): Unit = {
val dataDir = new File("extern1")
FileUtils.deleteQuietly(dataDir)
dataDir.mkdir()
// create external parquet table and insert some data
val session = new SnappySession(sc)
session.sql("create table extern1 (id long, data string, stat string) using parquet " +
s"options (path '${dataDir.getAbsolutePath}') partitioned by (stat)")
session.sql("insert into extern1 select id, 'data_' || id, 'stat' || (id % 10) " +
"from range(100000)")

// check results
assert(session.sql("select * from extern1 where stat = 'stat1'").collect().length === 10000)
assert(session.sql("select * from extern1 where stat = 'stat2'").collect().length === 10000)

// insert more data from another session
val session2 = new SnappySession(sc)
session2.sql("insert into extern1 select id, 'data_' || id, 'stat' || (id % 10) " +
"from range(10000)")

// check results
assert(session.sql("select * from extern1 where stat = 'stat1'").collect().length === 11000)
assert(session.sql("select * from extern1 where stat = 'stat2'").collect().length === 11000)
assert(session.sql("select * from extern1 where stat = 'stat3'").collect().length === 11000)
assert(session.sql("select * from extern1 where stat = 'stat11'").collect().length === 0)

// insert more data with new partitions
session2.sql("insert into extern1 select id, 'data_' || id, 'stat' || (id % 20) " +
"from range(10000)")

// check results
assert(session.sql("select * from extern1 where stat = 'stat1'").collect().length === 11500)
assert(session.sql("select * from extern1 where stat = 'stat2'").collect().length === 11500)
assert(session.sql("select * from extern1 where stat = 'stat3'").collect().length === 11500)
assert(session.sql("select * from extern1 where stat = 'stat11'").collect().length === 500)

assert(session2.sql("select * from extern1 where stat = 'stat1'").collect().length === 11500)
assert(session2.sql("select * from extern1 where stat = 'stat2'").collect().length === 11500)
assert(session2.sql("select * from extern1 where stat = 'stat3'").collect().length === 11500)
assert(session2.sql("select * from extern1 where stat = 'stat11'").collect().length === 500)

session.sql("drop table extern1")
session.clear()
session2.clear()

FileUtils.deleteDirectory(dataDir)
}
}

case class AirlineData(year: Int, month: Int, dayOfMonth: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ object SmartConnectorFunctions {

val sc = SparkContext.getOrCreate(conf)
val snc = SnappyContext(sc)
snc.snappySession.externalCatalog.invalidateAll()
snc.snappySession.sessionCatalog.invalidateAll()
val sqlContext = new SparkSession(sc).sqlContext
val pw = new PrintWriter(new FileOutputStream(
new File(s"ValidateNWQueries_$tableType.out"), true))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -793,9 +793,10 @@ class SnappyUnifiedMemoryManager private[memory](
numBytes: Long,
memoryMode: MemoryMode): Unit = {
// if UMM lock is already held, then release inline else enqueue and be done with it
if (Thread.holdsLock(this) || !pendingStorageMemoryReleases.offer(
(objectName, numBytes, memoryMode), 15, TimeUnit.SECONDS)) {
synchronized(releaseStorageMemoryForObject_(objectName, numBytes, memoryMode))
if (Thread.holdsLock(this)) synchronized {
releaseStorageMemoryForObject_(objectName, numBytes, memoryMode)
} else {
pendingStorageMemoryReleases.put((objectName, numBytes, memoryMode))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -824,7 +824,8 @@ class SnappyTableMutableAPISuite extends SnappyFunSuite with Logging with Before
val message = intercept[AnalysisException] {
df2.write.deleteFrom("col_table")
}.getMessage
assert(message.contains("column `pk3` cannot be resolved on the right side of the operation."))
assert(message.contains("column `pk3` cannot be resolved on the right side of the operation.")
|| message.contains("WHERE clause of the DELETE FROM statement must have all the key"))
}

test("Bug - SNAP-2157") {
Expand Down Expand Up @@ -908,7 +909,8 @@ class SnappyTableMutableAPISuite extends SnappyFunSuite with Logging with Before
}.getMessage

assert(message.contains("DeleteFrom operation requires " +
"key columns(s) or primary key defined on table."))
"key columns(s) or primary key defined on table.") ||
message.contains("WHERE clause of the DELETE FROM statement must have all the key"))
}


Expand All @@ -930,7 +932,8 @@ class SnappyTableMutableAPISuite extends SnappyFunSuite with Logging with Before
df2.write.deleteFrom("row_table")
}.getMessage

assert(message.contains("column `pk3` cannot be resolved on the right side of the operation."))
assert(message.contains("column `pk3` cannot be resolved on the right side of the operation.")
|| message.contains("WHERE clause of the DELETE FROM statement must have all the key"))
}

test("Delete From SQL using JDBC: row tables") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class TestHiveSnappySession(@transient protected val sc: SparkContext,
sharedState.cacheManager.clearCache()
loadedTables.clear()
sessionCatalog.clearTempTables()
sessionCatalog.externalCatalog.invalidateAll()
sessionCatalog.invalidateAll()

FunctionRegistry.getFunctionNames.asScala.filterNot(originalUDFs.contains(_)).
foreach { udfName => FunctionRegistry.unregisterTemporaryUDF(udfName) }
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/io/snappydata/Literals.scala
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,9 @@ object Property extends Enumeration {
"(value in bytes or k/m/g suffixes for unit, min 1k). Default is 4MB.", Some("4m"))

val ResultPersistenceTimeout: SparkValue[Long] = Val[Long](
s"${Constant.SPARK_PREFIX}sql.ResultPersistenceTimeout",
s"${Constant.SPARK_PREFIX}sql.resultPersistenceTimeout",
s"Maximum duration in seconds for which results larger than ${MaxMemoryResultSize.name}" +
s"are held on disk after which they are cleaned up. Default is 3600s (1h).", Some(3600L))
s"are held on disk after which they are cleaned up. Default is 7200 (2h).", Some(7200L))

val DisableHashJoin: SQLValue[Boolean] = SQLVal[Boolean](
s"${Constant.PROPERTY_PREFIX}sql.disableHashJoin",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ class CachedDataFrame(snappySession: SnappySession, queryExecution: QueryExecuti
} catch {
case t: Throwable
if CachedDataFrame.isConnectorCatalogStaleException(t, snappySession) =>
snappySession.externalCatalog.invalidateAll()
snappySession.sessionCatalog.invalidateAll()
SnappySession.clearAllCache()
val execution =
snappySession.getContextObject[() => QueryExecution](SnappySession.ExecutionKey)
Expand Down Expand Up @@ -1005,7 +1005,7 @@ object CachedDataFrame
} catch {
case t: Throwable
if CachedDataFrame.isConnectorCatalogStaleException(t, snappySession) =>
snappySession.externalCatalog.invalidateAll()
snappySession.sessionCatalog.invalidateAll()
SnappySession.clearAllCache()
if (attempts < retryCount) {
Thread.sleep(attempts*100)
Expand Down
9 changes: 5 additions & 4 deletions core/src/main/scala/org/apache/spark/sql/SnappySession.scala
Original file line number Diff line number Diff line change
Expand Up @@ -245,12 +245,11 @@ class SnappySession(_sc: SparkContext) extends SparkSession(_sc) {
val relations = plan.collect {
case _: Command => hasCommand = true; null
case u: UnresolvedRelation =>
val tableIdent = sessionCatalog.resolveTableIdentifier(u.tableIdentifier)
tableIdent.database.get -> tableIdent.table
sessionCatalog.resolveTableIdentifier(u.tableIdentifier)
}
if (hasCommand) externalCatalog.invalidateAll()
if (hasCommand) sessionCatalog.invalidateAll()
else if (relations.nonEmpty) {
relations.foreach(externalCatalog.invalidate)
relations.foreach(sessionCatalog.invalidate(_))
}
throw e
case _ =>
Expand Down Expand Up @@ -332,6 +331,7 @@ class SnappySession(_sc: SparkContext) extends SparkSession(_sc) {
@transient
private var sqlWarnings: SQLWarning = _

private[sql] var catalogInitialized: Boolean = _
private[sql] var hiveInitializing: Boolean = _

private[sql] def isHiveSupportEnabled(v: String): Boolean = Utils.toLowerCase(v) match {
Expand Down Expand Up @@ -1700,6 +1700,7 @@ class SnappySession(_sc: SparkContext) extends SparkSession(_sc) {
plan match {
case LogicalRelation(rls: RowLevelSecurityRelation, _, _) =>
rls.enableOrDisableRowLevelSecurity(tableIdent, enableRls)
sessionCatalog.invalidate(tableIdent)
externalCatalog.invalidateCaches(tableIdent.database.get -> tableIdent.table :: Nil)
case _ =>
throw new AnalysisException("ALTER TABLE enable/disable Row Level Security " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ case class CodegenSparkFallback(var child: SparkPlan,
result
} catch {
case t: Throwable if CachedDataFrame.isConnectorCatalogStaleException(t, session) =>
session.externalCatalog.invalidateAll()
session.sessionCatalog.invalidateAll()
SnappySession.clearAllCache()
throw CachedDataFrame.catalogStaleFailure(t, session)
} finally {
Expand All @@ -125,7 +125,7 @@ case class CodegenSparkFallback(var child: SparkPlan,
}

private def handleStaleCatalogException[T](f: SparkPlan => T, plan: SparkPlan, t: Throwable) = {
session.externalCatalog.invalidateAll()
session.sessionCatalog.invalidateAll()
SnappySession.clearAllCache()
// fail immediate for insert/update/delete, else retry entire query
val action = plan.find {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ abstract class BaseColumnFormatRelation(
createExternalTableForColumnBatches(externalColumnTableName, conn)
// store schema will miss complex types etc, so use the user-provided one
val session = sqlContext.sparkSession.asInstanceOf[SnappySession]
session.externalCatalog.invalidate(schemaName -> tableName)
session.sessionCatalog.invalidate(TableIdentifier(tableName, Some(schemaName)))
_schema = userSchema
_relationInfoAndRegion = null
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -674,14 +674,16 @@ class SnappySessionState(val snappySession: SnappySession)
* Internal catalog for managing table and database states.
*/
override lazy val catalog: SnappySessionCatalog = {
new SnappySessionCatalog(
val sessionCatalog = new SnappySessionCatalog(
snappySession.sharedState.getExternalCatalogInstance(snappySession),
snappySession,
snappySession.sharedState.globalTempViewManager,
functionResourceLoader,
functionRegistry,
conf,
newHadoopConf())
snappySession.catalogInitialized = true
sessionCatalog
}

protected lazy val wrapperCatalog: SessionCatalogWrapper = {
Expand All @@ -701,7 +703,7 @@ class SnappySessionState(val snappySession: SnappySession)
python.ExtractPythonUDFs,
TokenizeSubqueries(snappySession),
EnsureRequirements(conf),
OptimizeSortAndFilePlans(conf),
OptimizeSortAndFilePlans(snappySession),
CollapseCollocatedPlans(snappySession),
CollapseCodegenStages(conf),
InsertCachedPlanFallback(snappySession, topLevel),
Expand Down Expand Up @@ -916,14 +918,28 @@ class SnappyAnalyzer(sessionState: SnappySessionState)
* Rule to replace Spark's SortExec plans with an optimized SnappySortExec (in SMJ for now).
* Also sets the "spark.task.cpus" property implicitly for file scans/writes.
*/
case class OptimizeSortAndFilePlans(conf: SnappyConf) extends Rule[SparkPlan] {
case class OptimizeSortAndFilePlans(session: SnappySession) extends Rule[SparkPlan] {
override def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
case join@joins.SortMergeJoinExec(_, _, _, _, _, sort@SortExec(_, _, child, _)) =>
join.copy(right = SnappySortExec(sort, child))
case s@(_: FileSourceScanExec | _: HiveTableScanExec | _: InsertIntoHiveTable |
case i: InsertIntoHiveTable =>
val table = i.table.catalogTable
// invalidate meta-data since that can change after the insert
val tableIdentifier = session.sessionCatalog.resolveTableIdentifier(table.identifier)
session.externalCatalog.invalidate(tableIdentifier.database.get -> tableIdentifier.table)
session.sessionState.conf.setDynamicCpusPerTask()
i
case c@ExecutedCommandExec(i: InsertIntoHadoopFsRelationCommand) if i.catalogTable.isDefined =>
val table = i.catalogTable.get
// invalidate meta-data since that can change after the insert
val tableIdentifier = session.sessionCatalog.resolveTableIdentifier(table.identifier)
session.externalCatalog.invalidate(tableIdentifier.database.get -> tableIdentifier.table)
session.sessionState.conf.setDynamicCpusPerTask()
c
case s@(_: FileSourceScanExec | _: HiveTableScanExec |
ExecutedCommandExec(_: InsertIntoHadoopFsRelationCommand |
_: CreateHiveTableAsSelectCommand)) =>
conf.setDynamicCpusPerTask()
session.sessionState.conf.setDynamicCpusPerTask()
s
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ object ColumnTableBulkOps {
result
} catch {
case t: Throwable if CachedDataFrame.isConnectorCatalogStaleException(t, session) =>
session.externalCatalog.invalidateAll()
session.sessionCatalog.invalidateAll()
SnappySession.clearAllCache()
// throw failure immediately to keep it consistent with insert/update/delete
throw CachedDataFrame.catalogStaleFailure(t, session)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ import java.sql.SQLException
import scala.util.control.NonFatal

import com.gemstone.gemfire.SystemFailure
import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
import com.pivotal.gemfirexd.Attribute
import com.pivotal.gemfirexd.internal.iapi.util.IdUtil
import io.snappydata.Constant
import io.snappydata.sql.catalog.CatalogObjectType.getTableType
import io.snappydata.sql.catalog.SnappyExternalCatalog.{DBTABLE_PROPERTY, getTableWithSchema}
import io.snappydata.sql.catalog.{CatalogObjectType, SnappyExternalCatalog}
import io.snappydata.sql.catalog.{CatalogObjectType, ConnectorExternalCatalog, SnappyExternalCatalog}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

Expand Down Expand Up @@ -96,6 +97,19 @@ class SnappySessionCatalog(val externalCatalog: SnappyExternalCatalog,
defaultName
}

/** A cache of Spark SQL data source tables that have been accessed. */
// noinspection UnstableApiUsage
protected[sql] val cachedDataSourceTables: LoadingCache[TableIdentifier, LogicalPlan] = {
val loader = new CacheLoader[TableIdentifier, LogicalPlan]() {
override def load(tableName: TableIdentifier): LogicalPlan = {
logDebug(s"Creating new cached data source for $tableName")
val table = externalCatalog.getTable(tableName.database.get, tableName.table)
new FindDataSourceTable(snappySession)(SimpleCatalogRelation(table.database, table))
}
}
CacheBuilder.newBuilder().maximumSize(ConnectorExternalCatalog.cacheSize >> 2).build(loader)
}

final def getCurrentSchema: String = getCurrentDatabase

/**
Expand Down Expand Up @@ -270,7 +284,11 @@ class SnappySessionCatalog(val externalCatalog: SnappyExternalCatalog,
final def resolveRelationWithAlias(tableIdent: TableIdentifier,
alias: Option[String] = None): LogicalPlan = {
// resolve the relation right away with alias around
new FindDataSourceTable(snappySession)(lookupRelation(tableIdent, alias))
lookupRelation(tableIdent, alias) match {
case lr: LogicalRelation => lr
case a: SubqueryAlias if a.child.isInstanceOf[LogicalRelation] => a
case r => new FindDataSourceTable(snappySession)(r)
}
}

/**
Expand Down Expand Up @@ -878,7 +896,17 @@ class SnappySessionCatalog(val externalCatalog: SnappyExternalCatalog,
getPolicyPlan(table)
} else {
view = None
SimpleCatalogRelation(schemaName, table)
if (DDLUtils.isDatasourceTable(table)) {
val resolved = TableIdentifier(tableName, Some(schemaName))
cachedDataSourceTables(resolved) match {
case lr: LogicalRelation
if lr.catalogTable.isDefined && (lr.catalogTable.get ne table) =>
// refresh since table metadata has changed
cachedDataSourceTables.invalidate(resolved)
cachedDataSourceTables(resolved)
case p => p
}
} else SimpleCatalogRelation(schemaName, table)
}
}
case Some(p) => p
Expand Down Expand Up @@ -914,13 +942,23 @@ class SnappySessionCatalog(val externalCatalog: SnappyExternalCatalog,
super.refreshTable(table)
} else {
val resolved = resolveTableIdentifier(table)
externalCatalog.invalidate(resolved.database.get -> resolved.table)
invalidate(resolved)
if (snappySession.enableHiveSupport) {
hiveSessionCatalog.refreshTable(resolved)
}
}
}

def invalidate(resolved: TableIdentifier, sessionOnly: Boolean = false): Unit = {
cachedDataSourceTables.invalidate(resolved)
if (!sessionOnly) externalCatalog.invalidate(resolved.database.get -> resolved.table)
}

def invalidateAll(sessionOnly: Boolean = false): Unit = {
cachedDataSourceTables.invalidateAll()
if (!sessionOnly) externalCatalog.invalidateAll()
}

def getDataSourceRelations[T](tableType: CatalogObjectType.Type): Seq[T] = {
externalCatalog.getAllTables().collect {
case table if tableType == CatalogObjectType.getTableType(table) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,9 @@ class SnappyConf(@transient val session: SnappySession)
key
}

case GAttr.USERNAME_ATTR | GAttr.USERNAME_ALT_ATTR | GAttr.PASSWORD_ATTR => key
case GAttr.USERNAME_ATTR | GAttr.USERNAME_ALT_ATTR | GAttr.PASSWORD_ATTR =>
if (session.catalogInitialized) session.sessionCatalog.invalidateAll(sessionOnly = true)
key

case _ if key.startsWith("spark.sql.aqp.") =>
session.clearPlanCache()
Expand Down
Loading

0 comments on commit 8b43301

Please sign in to comment.