Skip to content

Commit f5c418d

Browse files
marmbrusrxin
authored andcommitted
[SQL] SPARK-1372 Support for caching and uncaching tables in a SQLContext.
This doesn't yet support different databases in Hive (though you can probably workaround this by calling `USE <dbname>`). However, given the time constraints for 1.0 I think its probably worth including this now and extending the functionality in the next release. Author: Michael Armbrust <michael@databricks.com> Closes #282 from marmbrus/cacheTables and squashes the following commits: 83785db [Michael Armbrust] Support for caching and uncaching tables in a SQLContext.
1 parent ada310a commit f5c418d

File tree

7 files changed

+169
-2
lines changed

7 files changed

+169
-2
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ trait Catalog {
3131
alias: Option[String] = None): LogicalPlan
3232

3333
def registerTable(databaseName: Option[String], tableName: String, plan: LogicalPlan): Unit
34+
def unregisterTable(databaseName: Option[String], tableName: String): Unit
3435
}
3536

3637
class SimpleCatalog extends Catalog {
@@ -40,7 +41,7 @@ class SimpleCatalog extends Catalog {
4041
tables += ((tableName, plan))
4142
}
4243

43-
def dropTable(tableName: String) = tables -= tableName
44+
def unregisterTable(databaseName: Option[String], tableName: String) = { tables -= tableName }
4445

4546
def lookupRelation(
4647
databaseName: Option[String],
@@ -87,6 +88,10 @@ trait OverrideCatalog extends Catalog {
8788
plan: LogicalPlan): Unit = {
8889
overrides.put((databaseName, tableName), plan)
8990
}
91+
92+
override def unregisterTable(databaseName: Option[String], tableName: String): Unit = {
93+
overrides.remove((databaseName, tableName))
94+
}
9095
}
9196

9297
/**
@@ -104,4 +109,8 @@ object EmptyCatalog extends Catalog {
104109
def registerTable(databaseName: Option[String], tableName: String, plan: LogicalPlan): Unit = {
105110
throw new UnsupportedOperationException
106111
}
112+
113+
def unregisterTable(databaseName: Option[String], tableName: String): Unit = {
114+
throw new UnsupportedOperationException
115+
}
107116
}

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,9 @@ import org.apache.spark.sql.catalyst.analysis._
2626
import org.apache.spark.sql.catalyst.dsl
2727
import org.apache.spark.sql.catalyst.expressions._
2828
import org.apache.spark.sql.catalyst.optimizer.Optimizer
29-
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
29+
import org.apache.spark.sql.catalyst.plans.logical.{Subquery, LogicalPlan}
3030
import org.apache.spark.sql.catalyst.rules.RuleExecutor
31+
import org.apache.spark.sql.columnar.InMemoryColumnarTableScan
3132
import org.apache.spark.sql.execution._
3233

3334
/**
@@ -111,6 +112,35 @@ class SQLContext(@transient val sparkContext: SparkContext)
111112
result
112113
}
113114

115+
/** Returns the specified table as a SchemaRDD */
116+
def table(tableName: String): SchemaRDD =
117+
new SchemaRDD(this, catalog.lookupRelation(None, tableName))
118+
119+
/** Caches the specified table in-memory. */
120+
def cacheTable(tableName: String): Unit = {
121+
val currentTable = catalog.lookupRelation(None, tableName)
122+
val asInMemoryRelation =
123+
InMemoryColumnarTableScan(currentTable.output, executePlan(currentTable).executedPlan)
124+
125+
catalog.registerTable(None, tableName, SparkLogicalPlan(asInMemoryRelation))
126+
}
127+
128+
/** Removes the specified table from the in-memory cache. */
129+
def uncacheTable(tableName: String): Unit = {
130+
EliminateAnalysisOperators(catalog.lookupRelation(None, tableName)) match {
131+
// This is kind of a hack to make sure that if this was just an RDD registered as a table,
132+
// we reregister the RDD as a table.
133+
case SparkLogicalPlan(inMem @ InMemoryColumnarTableScan(_, e: ExistingRdd)) =>
134+
inMem.cachedColumnBuffers.unpersist()
135+
catalog.unregisterTable(None, tableName)
136+
catalog.registerTable(None, tableName, SparkLogicalPlan(e))
137+
case SparkLogicalPlan(inMem: InMemoryColumnarTableScan) =>
138+
inMem.cachedColumnBuffers.unpersist()
139+
catalog.unregisterTable(None, tableName)
140+
case plan => throw new IllegalArgumentException(s"Table $tableName is not cached: $plan")
141+
}
142+
}
143+
114144
protected[sql] class SparkPlanner extends SparkStrategies {
115145
val sparkContext = self.sparkContext
116146

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql
19+
20+
import org.scalatest.FunSuite
21+
import org.apache.spark.sql.TestData._
22+
import org.apache.spark.sql.test.TestSQLContext
23+
import org.apache.spark.sql.execution.SparkLogicalPlan
24+
import org.apache.spark.sql.columnar.InMemoryColumnarTableScan
25+
26+
class CachedTableSuite extends QueryTest {
27+
TestData // Load test tables.
28+
29+
test("read from cached table and uncache") {
30+
TestSQLContext.cacheTable("testData")
31+
32+
checkAnswer(
33+
TestSQLContext.table("testData"),
34+
testData.collect().toSeq
35+
)
36+
37+
TestSQLContext.table("testData").queryExecution.analyzed match {
38+
case SparkLogicalPlan(_ : InMemoryColumnarTableScan) => // Found evidence of caching
39+
case noCache => fail(s"No cache node found in plan $noCache")
40+
}
41+
42+
TestSQLContext.uncacheTable("testData")
43+
44+
checkAnswer(
45+
TestSQLContext.table("testData"),
46+
testData.collect().toSeq
47+
)
48+
49+
TestSQLContext.table("testData").queryExecution.analyzed match {
50+
case cachePlan @ SparkLogicalPlan(_ : InMemoryColumnarTableScan) =>
51+
fail(s"Table still cached after uncache: $cachePlan")
52+
case noCache => // Table uncached successfully
53+
}
54+
}
55+
56+
test("correct error on uncache of non-cached table") {
57+
intercept[IllegalArgumentException] {
58+
TestSQLContext.uncacheTable("testData")
59+
}
60+
}
61+
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,13 @@ class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with Logging {
141141
*/
142142
override def registerTable(
143143
databaseName: Option[String], tableName: String, plan: LogicalPlan): Unit = ???
144+
145+
/**
146+
* UNIMPLEMENTED: It needs to be decided how we will persist in-memory tables to the metastore.
147+
* For now, if this functionality is desired mix in the in-memory [[OverrideCatalog]].
148+
*/
149+
override def unregisterTable(
150+
databaseName: Option[String], tableName: String): Unit = ???
144151
}
145152

146153
object HiveMetastoreTypes extends RegexParsers {
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
238 val_238
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
238 val_238
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.hive
19+
20+
import org.apache.spark.sql.execution.SparkLogicalPlan
21+
import org.apache.spark.sql.columnar.InMemoryColumnarTableScan
22+
import org.apache.spark.sql.hive.execution.HiveComparisonTest
23+
24+
class CachedTableSuite extends HiveComparisonTest {
25+
TestHive.loadTestTable("src")
26+
27+
test("cache table") {
28+
TestHive.cacheTable("src")
29+
}
30+
31+
createQueryTest("read from cached table",
32+
"SELECT * FROM src LIMIT 1")
33+
34+
test("check that table is cached and uncache") {
35+
TestHive.table("src").queryExecution.analyzed match {
36+
case SparkLogicalPlan(_ : InMemoryColumnarTableScan) => // Found evidence of caching
37+
case noCache => fail(s"No cache node found in plan $noCache")
38+
}
39+
TestHive.uncacheTable("src")
40+
}
41+
42+
createQueryTest("read from uncached table",
43+
"SELECT * FROM src LIMIT 1")
44+
45+
test("make sure table is uncached") {
46+
TestHive.table("src").queryExecution.analyzed match {
47+
case cachePlan @ SparkLogicalPlan(_ : InMemoryColumnarTableScan) =>
48+
fail(s"Table still cached after uncache: $cachePlan")
49+
case noCache => // Table uncached successfully
50+
}
51+
}
52+
53+
test("correct error on uncache of non-cached table") {
54+
intercept[IllegalArgumentException] {
55+
TestHive.uncacheTable("src")
56+
}
57+
}
58+
}

0 commit comments

Comments
 (0)