Skip to content

Commit cddb4b7

Browse files
hvanhovellrxin
authored andcommitted
[SPARK-20420][SQL] Add events to the external catalog
## What changes were proposed in this pull request? It is often useful to be able to track changes to the `ExternalCatalog`. This PR makes the `ExternalCatalog` emit events when a catalog object is changed. Events are fired before and after the change. The following events are fired per object: - Database - CreateDatabasePreEvent: event fired before the database is created. - CreateDatabaseEvent: event fired after the database has been created. - DropDatabasePreEvent: event fired before the database is dropped. - DropDatabaseEvent: event fired after the database has been dropped. - Table - CreateTablePreEvent: event fired before the table is created. - CreateTableEvent: event fired after the table has been created. - RenameTablePreEvent: event fired before the table is renamed. - RenameTableEvent: event fired after the table has been renamed. - DropTablePreEvent: event fired before the table is dropped. - DropTableEvent: event fired after the table has been dropped. - Function - CreateFunctionPreEvent: event fired before the function is created. - CreateFunctionEvent: event fired after the function has been created. - RenameFunctionPreEvent: event fired before the function is renamed. - RenameFunctionEvent: event fired after the function has been renamed. - DropFunctionPreEvent: event fired before the function is dropped. - DropFunctionPreEvent: event fired after the function has been dropped. The current events currently only contain the names of the object modified. We add more events, and more details at a later point. A user can monitor changes to the external catalog by adding a listener to the Spark listener bus checking for `ExternalCatalogEvent`s using the `SparkListener.onOtherEvent` hook. A more direct approach is add listener directly to the `ExternalCatalog`. ## How was this patch tested? Added the `ExternalCatalogEventSuite`. Author: Herman van Hovell <hvanhovell@databricks.com> Closes #17710 from hvanhovell/SPARK-20420. (cherry picked from commit e2b3d23) Signed-off-by: Reynold Xin <rxin@databricks.com>
1 parent 6cd2f16 commit cddb4b7

File tree

6 files changed

+457
-25
lines changed

6 files changed

+457
-25
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala

Lines changed: 76 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.catalog
2020
import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, NoSuchDatabaseException, NoSuchFunctionException, NoSuchTableException}
2121
import org.apache.spark.sql.catalyst.expressions.Expression
2222
import org.apache.spark.sql.types.StructType
23+
import org.apache.spark.util.ListenerBus
2324

2425
/**
2526
* Interface for the system catalog (of functions, partitions, tables, and databases).
@@ -30,7 +31,8 @@ import org.apache.spark.sql.types.StructType
3031
*
3132
* Implementations should throw [[NoSuchDatabaseException]] when databases don't exist.
3233
*/
33-
abstract class ExternalCatalog {
34+
abstract class ExternalCatalog
35+
extends ListenerBus[ExternalCatalogEventListener, ExternalCatalogEvent] {
3436
import CatalogTypes.TablePartitionSpec
3537

3638
protected def requireDbExists(db: String): Unit = {
@@ -61,9 +63,22 @@ abstract class ExternalCatalog {
6163
// Databases
6264
// --------------------------------------------------------------------------
6365

64-
def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit
66+
final def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit = {
67+
val db = dbDefinition.name
68+
postToAll(CreateDatabasePreEvent(db))
69+
doCreateDatabase(dbDefinition, ignoreIfExists)
70+
postToAll(CreateDatabaseEvent(db))
71+
}
72+
73+
protected def doCreateDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit
74+
75+
final def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit = {
76+
postToAll(DropDatabasePreEvent(db))
77+
doDropDatabase(db, ignoreIfNotExists, cascade)
78+
postToAll(DropDatabaseEvent(db))
79+
}
6580

66-
def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit
81+
protected def doDropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit
6782

6883
/**
6984
* Alter a database whose name matches the one specified in `dbDefinition`,
@@ -88,11 +103,39 @@ abstract class ExternalCatalog {
88103
// Tables
89104
// --------------------------------------------------------------------------
90105

91-
def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit
106+
final def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = {
107+
val db = tableDefinition.database
108+
val name = tableDefinition.identifier.table
109+
postToAll(CreateTablePreEvent(db, name))
110+
doCreateTable(tableDefinition, ignoreIfExists)
111+
postToAll(CreateTableEvent(db, name))
112+
}
92113

93-
def dropTable(db: String, table: String, ignoreIfNotExists: Boolean, purge: Boolean): Unit
114+
protected def doCreateTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit
94115

95-
def renameTable(db: String, oldName: String, newName: String): Unit
116+
final def dropTable(
117+
db: String,
118+
table: String,
119+
ignoreIfNotExists: Boolean,
120+
purge: Boolean): Unit = {
121+
postToAll(DropTablePreEvent(db, table))
122+
doDropTable(db, table, ignoreIfNotExists, purge)
123+
postToAll(DropTableEvent(db, table))
124+
}
125+
126+
protected def doDropTable(
127+
db: String,
128+
table: String,
129+
ignoreIfNotExists: Boolean,
130+
purge: Boolean): Unit
131+
132+
final def renameTable(db: String, oldName: String, newName: String): Unit = {
133+
postToAll(RenameTablePreEvent(db, oldName, newName))
134+
doRenameTable(db, oldName, newName)
135+
postToAll(RenameTableEvent(db, oldName, newName))
136+
}
137+
138+
protected def doRenameTable(db: String, oldName: String, newName: String): Unit
96139

97140
/**
98141
* Alter a table whose database and name match the ones specified in `tableDefinition`, assuming
@@ -269,16 +312,40 @@ abstract class ExternalCatalog {
269312
// Functions
270313
// --------------------------------------------------------------------------
271314

272-
def createFunction(db: String, funcDefinition: CatalogFunction): Unit
315+
final def createFunction(db: String, funcDefinition: CatalogFunction): Unit = {
316+
val name = funcDefinition.identifier.funcName
317+
postToAll(CreateFunctionPreEvent(db, name))
318+
doCreateFunction(db, funcDefinition)
319+
postToAll(CreateFunctionEvent(db, name))
320+
}
273321

274-
def dropFunction(db: String, funcName: String): Unit
322+
protected def doCreateFunction(db: String, funcDefinition: CatalogFunction): Unit
275323

276-
def renameFunction(db: String, oldName: String, newName: String): Unit
324+
final def dropFunction(db: String, funcName: String): Unit = {
325+
postToAll(DropFunctionPreEvent(db, funcName))
326+
doDropFunction(db, funcName)
327+
postToAll(DropFunctionEvent(db, funcName))
328+
}
329+
330+
protected def doDropFunction(db: String, funcName: String): Unit
331+
332+
final def renameFunction(db: String, oldName: String, newName: String): Unit = {
333+
postToAll(RenameFunctionPreEvent(db, oldName, newName))
334+
doRenameFunction(db, oldName, newName)
335+
postToAll(RenameFunctionEvent(db, oldName, newName))
336+
}
337+
338+
protected def doRenameFunction(db: String, oldName: String, newName: String): Unit
277339

278340
def getFunction(db: String, funcName: String): CatalogFunction
279341

280342
def functionExists(db: String, funcName: String): Boolean
281343

282344
def listFunctions(db: String, pattern: String): Seq[String]
283345

346+
override protected def doPostEvent(
347+
listener: ExternalCatalogEventListener,
348+
event: ExternalCatalogEvent): Unit = {
349+
listener.onEvent(event)
350+
}
284351
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ class InMemoryCatalog(
9898
// Databases
9999
// --------------------------------------------------------------------------
100100

101-
override def createDatabase(
101+
override protected def doCreateDatabase(
102102
dbDefinition: CatalogDatabase,
103103
ignoreIfExists: Boolean): Unit = synchronized {
104104
if (catalog.contains(dbDefinition.name)) {
@@ -119,7 +119,7 @@ class InMemoryCatalog(
119119
}
120120
}
121121

122-
override def dropDatabase(
122+
override protected def doDropDatabase(
123123
db: String,
124124
ignoreIfNotExists: Boolean,
125125
cascade: Boolean): Unit = synchronized {
@@ -180,7 +180,7 @@ class InMemoryCatalog(
180180
// Tables
181181
// --------------------------------------------------------------------------
182182

183-
override def createTable(
183+
override protected def doCreateTable(
184184
tableDefinition: CatalogTable,
185185
ignoreIfExists: Boolean): Unit = synchronized {
186186
assert(tableDefinition.identifier.database.isDefined)
@@ -221,7 +221,7 @@ class InMemoryCatalog(
221221
}
222222
}
223223

224-
override def dropTable(
224+
override protected def doDropTable(
225225
db: String,
226226
table: String,
227227
ignoreIfNotExists: Boolean,
@@ -264,7 +264,10 @@ class InMemoryCatalog(
264264
}
265265
}
266266

267-
override def renameTable(db: String, oldName: String, newName: String): Unit = synchronized {
267+
override protected def doRenameTable(
268+
db: String,
269+
oldName: String,
270+
newName: String): Unit = synchronized {
268271
requireTableExists(db, oldName)
269272
requireTableNotExists(db, newName)
270273
val oldDesc = catalog(db).tables(oldName)
@@ -565,18 +568,21 @@ class InMemoryCatalog(
565568
// Functions
566569
// --------------------------------------------------------------------------
567570

568-
override def createFunction(db: String, func: CatalogFunction): Unit = synchronized {
571+
override protected def doCreateFunction(db: String, func: CatalogFunction): Unit = synchronized {
569572
requireDbExists(db)
570573
requireFunctionNotExists(db, func.identifier.funcName)
571574
catalog(db).functions.put(func.identifier.funcName, func)
572575
}
573576

574-
override def dropFunction(db: String, funcName: String): Unit = synchronized {
577+
override protected def doDropFunction(db: String, funcName: String): Unit = synchronized {
575578
requireFunctionExists(db, funcName)
576579
catalog(db).functions.remove(funcName)
577580
}
578581

579-
override def renameFunction(db: String, oldName: String, newName: String): Unit = synchronized {
582+
override protected def doRenameFunction(
583+
db: String,
584+
oldName: String,
585+
newName: String): Unit = synchronized {
580586
requireFunctionExists(db, oldName)
581587
requireFunctionNotExists(db, newName)
582588
val newFunc = getFunction(db, oldName).copy(identifier = FunctionIdentifier(newName, Some(db)))
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.sql.catalyst.catalog
18+
19+
import org.apache.spark.scheduler.SparkListenerEvent
20+
21+
/**
22+
* Event emitted by the external catalog when it is modified. Events are either fired before or
23+
* after the modification (the event should document this).
24+
*/
25+
trait ExternalCatalogEvent extends SparkListenerEvent
26+
27+
/**
28+
* Listener interface for external catalog modification events.
29+
*/
30+
trait ExternalCatalogEventListener {
31+
def onEvent(event: ExternalCatalogEvent): Unit
32+
}
33+
34+
/**
35+
* Event fired when a database is create or dropped.
36+
*/
37+
trait DatabaseEvent extends ExternalCatalogEvent {
38+
/**
39+
* Database of the object that was touched.
40+
*/
41+
val database: String
42+
}
43+
44+
/**
45+
* Event fired before a database is created.
46+
*/
47+
case class CreateDatabasePreEvent(database: String) extends DatabaseEvent
48+
49+
/**
50+
* Event fired after a database has been created.
51+
*/
52+
case class CreateDatabaseEvent(database: String) extends DatabaseEvent
53+
54+
/**
55+
* Event fired before a database is dropped.
56+
*/
57+
case class DropDatabasePreEvent(database: String) extends DatabaseEvent
58+
59+
/**
60+
* Event fired after a database has been dropped.
61+
*/
62+
case class DropDatabaseEvent(database: String) extends DatabaseEvent
63+
64+
/**
65+
* Event fired when a table is created, dropped or renamed.
66+
*/
67+
trait TableEvent extends DatabaseEvent {
68+
/**
69+
* Name of the table that was touched.
70+
*/
71+
val name: String
72+
}
73+
74+
/**
75+
* Event fired before a table is created.
76+
*/
77+
case class CreateTablePreEvent(database: String, name: String) extends TableEvent
78+
79+
/**
80+
* Event fired after a table has been created.
81+
*/
82+
case class CreateTableEvent(database: String, name: String) extends TableEvent
83+
84+
/**
85+
* Event fired before a table is dropped.
86+
*/
87+
case class DropTablePreEvent(database: String, name: String) extends TableEvent
88+
89+
/**
90+
* Event fired after a table has been dropped.
91+
*/
92+
case class DropTableEvent(database: String, name: String) extends TableEvent
93+
94+
/**
95+
* Event fired before a table is renamed.
96+
*/
97+
case class RenameTablePreEvent(
98+
database: String,
99+
name: String,
100+
newName: String)
101+
extends TableEvent
102+
103+
/**
104+
* Event fired after a table has been renamed.
105+
*/
106+
case class RenameTableEvent(
107+
database: String,
108+
name: String,
109+
newName: String)
110+
extends TableEvent
111+
112+
/**
113+
* Event fired when a function is created, dropped or renamed.
114+
*/
115+
trait FunctionEvent extends DatabaseEvent {
116+
/**
117+
* Name of the function that was touched.
118+
*/
119+
val name: String
120+
}
121+
122+
/**
123+
* Event fired before a function is created.
124+
*/
125+
case class CreateFunctionPreEvent(database: String, name: String) extends FunctionEvent
126+
127+
/**
128+
* Event fired after a function has been created.
129+
*/
130+
case class CreateFunctionEvent(database: String, name: String) extends FunctionEvent
131+
132+
/**
133+
* Event fired before a function is dropped.
134+
*/
135+
case class DropFunctionPreEvent(database: String, name: String) extends FunctionEvent
136+
137+
/**
138+
* Event fired after a function has been dropped.
139+
*/
140+
case class DropFunctionEvent(database: String, name: String) extends FunctionEvent
141+
142+
/**
143+
* Event fired before a function is renamed.
144+
*/
145+
case class RenameFunctionPreEvent(
146+
database: String,
147+
name: String,
148+
newName: String)
149+
extends FunctionEvent
150+
151+
/**
152+
* Event fired after a function has been renamed.
153+
*/
154+
case class RenameFunctionEvent(
155+
database: String,
156+
name: String,
157+
newName: String)
158+
extends FunctionEvent

0 commit comments

Comments
 (0)