Skip to content

Commit e7cfb7d

Browse files
colinmjjLuan, Xuedong
authored andcommitted
[CARMEL-7220] Backport Integration with admin and metadata cache notification (apache#48)
Co-authored-by: Luan, Xuedong <xuluan@ebay.com>
1 parent 4d6e3a5 commit e7cfb7d

File tree

8 files changed

+139
-7
lines changed

8 files changed

+139
-7
lines changed

core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,11 @@ private[spark] class LiveListenerBus(conf: SparkConf) {
8080
addToQueue(listener, EVENT_LOG_QUEUE)
8181
}
8282

83+
/** Add a listener to the external catalog queue. */
84+
def addToExternalCatalogQueue(listener: SparkListenerInterface): Unit = {
85+
addToQueue(listener, EXTERNAL_CATALOG_QUEUE)
86+
}
87+
8388
/**
8489
* Add a listener to a specific queue, creating a new queue if needed. Queues are independent
8590
* of each other (each one uses a separate thread for delivering events), allowing slower
@@ -158,6 +163,17 @@ private[spark] class LiveListenerBus(conf: SparkConf) {
158163
}
159164
}
160165

166+
def postToQueue(event: SparkListenerEvent, queue: String): Unit = {
167+
if (stopped.get()) {
168+
return
169+
}
170+
queues.asScala.find(_.name == queue) match {
171+
case Some(queue) =>
172+
queue.post(event)
173+
case None =>
174+
}
175+
}
176+
161177
/**
162178
* Start sending events to attached listeners.
163179
*
@@ -257,6 +273,8 @@ private[spark] object LiveListenerBus {
257273
private[scheduler] val EXECUTOR_MANAGEMENT_QUEUE = "executorManagement"
258274

259275
private[scheduler] val EVENT_LOG_QUEUE = "eventLog"
276+
277+
private[scheduler] val EXTERNAL_CATALOG_QUEUE = "externalCatalog"
260278
}
261279

262280
private[spark] class LiveListenerBusMetrics(conf: SparkConf)

core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ private[spark] trait UIRoot {
9999
def checkUIViewPermissions(appId: String, attemptId: Option[String], user: String): Boolean
100100
}
101101

102-
private[v1] object UIRootFromServletContext {
102+
private[spark] object UIRootFromServletContext {
103103

104104
private val attribute = getClass.getCanonicalName
105105

@@ -112,7 +112,7 @@ private[v1] object UIRootFromServletContext {
112112
}
113113
}
114114

115-
private[v1] trait ApiRequestContext {
115+
private[spark] trait ApiRequestContext {
116116
@Context
117117
protected var servletContext: ServletContext = _
118118

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import org.apache.hadoop.fs.Path
3333
import org.apache.spark.deploy.SparkHadoopUtil
3434
import org.apache.spark.internal.Logging
3535
import org.apache.spark.internal.config.SCRATCH_DIR
36+
import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerEvent}
3637
import org.apache.spark.sql.AnalysisException
3738
import org.apache.spark.sql.catalyst._
3839
import org.apache.spark.sql.catalyst.analysis._
@@ -73,7 +74,8 @@ class SessionCatalog(
7374
cacheSize: Int = SQLConf.get.tableRelationCacheSize,
7475
cacheTTL: Long = SQLConf.get.metadataCacheTTL,
7576
defaultDatabase: String = SQLConf.get.defaultDatabase,
76-
scratchSessionPath: Option[Path] = None) extends SQLConfHelper with Logging {
77+
scratchSessionPath: Option[Path] = None,
78+
liveBus: Option[LiveListenerBus] = None) extends SQLConfHelper with Logging {
7779
import SessionCatalog._
7880
import CatalogTypes.TablePartitionSpec
7981

@@ -219,6 +221,9 @@ class SessionCatalog(
219221
builder.build[QualifiedTableName, LogicalPlan]()
220222
}
221223

224+
private val listener = new ExternalCatalogListener(this)
225+
liveBus.map(_.addToExternalCatalogQueue(listener))
226+
222227
/** This method provides a way to get a cached plan. */
223228
def getCachedPlan(t: QualifiedTableName, c: Callable[LogicalPlan]): LogicalPlan = {
224229
tableRelationCache.get(t, c)
@@ -1999,3 +2004,17 @@ class SessionCatalog(
19992004
}
20002005
}
20012006
}
2007+
2008+
private class ExternalCatalogListener(catalog: SessionCatalog) extends SparkListener {
2009+
2010+
override def onOtherEvent(event: SparkListenerEvent): Unit = {
2011+
event match {
2012+
case e: AlterTableEvent => onAlterTable(e)
2013+
case _ => // Ignore
2014+
}
2015+
}
2016+
2017+
private def onAlterTable(e: AlterTableEvent): Unit = {
2018+
catalog.refreshTable(TableIdentifier(e.name, Some(e.database)))
2019+
}
2020+
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.sql.execution.ui
18+
19+
import javax.ws.rs._
20+
import javax.ws.rs.core.MediaType
21+
22+
import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder}
23+
import org.glassfish.jersey.server.ServerProperties
24+
import org.glassfish.jersey.servlet.ServletContainer
25+
26+
import org.apache.spark.internal.Logging
27+
import org.apache.spark.sql.catalyst.catalog.{AlterTableEvent, AlterTableKind, DropDatabaseEvent, DropTableEvent}
28+
import org.apache.spark.sql.execution.ui.EventType.EventType
29+
import org.apache.spark.status.api.v1.{ApiRequestContext, UIRoot, UIRootFromServletContext}
30+
import org.apache.spark.ui.SparkUI
31+
32+
object EventType extends Enumeration {
33+
type EventType = Value
34+
val ALTER_TABLE, DROP_TABLE, DROP_DATABASE = Value
35+
}
36+
37+
case class CatalogEvent(eventType: EventType, database: String, name: String)
38+
39+
case class RespondMessage(success: Boolean, msg: String)
40+
41+
@Path("/api")
42+
private[spark] class ExternalCatalogResource extends ApiRequestContext with Logging {
43+
44+
private val EXTERNAL_CATALOG_QUEUE = "externalCatalog"
45+
46+
@POST
47+
@Path("events")
48+
@Produces(Array(MediaType.APPLICATION_JSON))
49+
@Consumes(Array(MediaType.APPLICATION_JSON))
50+
def postEvents(@FormParam("eventList") eventList: Seq[CatalogEvent]): RespondMessage = {
51+
try {
52+
uiRoot.asInstanceOf[SparkUI].sc.map { sparkContext =>
53+
val listener = sparkContext.listenerBus
54+
eventList.map { event =>
55+
event.eventType match {
56+
case EventType.ALTER_TABLE =>
57+
val sparkEvent = AlterTableEvent(event.database, event.name, AlterTableKind.TABLE)
58+
listener.postToQueue(sparkEvent, EXTERNAL_CATALOG_QUEUE)
59+
case EventType.DROP_TABLE =>
60+
listener.postToQueue(
61+
DropTableEvent(event.database, event.name), EXTERNAL_CATALOG_QUEUE)
62+
case EventType.DROP_DATABASE =>
63+
listener.postToQueue(
64+
DropDatabaseEvent(event.database), EXTERNAL_CATALOG_QUEUE)
65+
case _ =>
66+
}
67+
}
68+
}
69+
RespondMessage(success = true, "Successful to post catalog events")
70+
} catch {
71+
case e: Exception =>
72+
logWarning("Failed to post catalog events", e)
73+
RespondMessage(success = false, "Failed to post catalog events")
74+
}
75+
}
76+
}
77+
78+
private[spark] object ExternalCatalogResource {
79+
def getServletHandler(uiRoot: UIRoot): ServletContextHandler = {
80+
val jerseyContext = new ServletContextHandler(ServletContextHandler.NO_SESSIONS)
81+
jerseyContext.setContextPath("/catalog")
82+
val holder: ServletHolder = new ServletHolder(classOf[ServletContainer])
83+
holder.setInitParameter(ServerProperties.PROVIDER_PACKAGES, "org.apache.spark.status.api.v1")
84+
UIRootFromServletContext.setUiRoot(jerseyContext, uiRoot)
85+
jerseyContext.addServlet(holder, "/*")
86+
jerseyContext
87+
}
88+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ class SQLTab(val sqlStore: SQLAppStatusStore, sparkUI: SparkUI)
3535
parent.attachTab(this)
3636

3737
parent.addStaticHandler(SQLTab.STATIC_RESOURCE_DIR, "/static/sql")
38+
parent.attachHandler(ExternalCatalogResource.getServletHandler(parent))
3839

3940
override def displayOrder: Int = 0
4041
}

sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,8 @@ abstract class BaseSessionStateBuilder(
160160
sqlParser,
161161
resourceLoader,
162162
new SparkUDFExpressionBuilder,
163-
scratchSessionPath = session.scratchSessionPath)
163+
scratchSessionPath = session.scratchSessionPath,
164+
liveBus = Some(session.sparkContext.listenerBus))
164165
parentState.foreach(_.catalog.copyStateTo(catalog))
165166
catalog
166167
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@ package org.apache.spark.sql.hive
2020
import org.apache.hadoop.conf.Configuration
2121
import org.apache.hadoop.fs.Path
2222

23+
import org.apache.spark.scheduler.LiveListenerBus
2324
import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, TableFunctionRegistry}
2425
import org.apache.spark.sql.catalyst.catalog._
26+
import org.apache.spark.sql.catalyst.catalog.{ExternalCatalog, FunctionResourceLoader, GlobalTempViewManager, SessionCatalog}
2527
import org.apache.spark.sql.catalyst.parser.ParserInterface
2628

2729
private[sql] class HiveSessionCatalog(
@@ -34,7 +36,8 @@ private[sql] class HiveSessionCatalog(
3436
parser: ParserInterface,
3537
functionResourceLoader: FunctionResourceLoader,
3638
functionExpressionBuilder: FunctionExpressionBuilder,
37-
scratchSessionPath: Option[Path])
39+
scratchSessionPath: Option[Path],
40+
liveBus: Option[LiveListenerBus] = None)
3841
extends SessionCatalog(
3942
externalCatalogBuilder,
4043
globalTempViewManagerBuilder,
@@ -44,5 +47,6 @@ private[sql] class HiveSessionCatalog(
4447
parser,
4548
functionResourceLoader,
4649
functionExpressionBuilder,
47-
scratchSessionPath = scratchSessionPath) {
50+
scratchSessionPath = scratchSessionPath,
51+
liveBus = liveBus) {
4852
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,8 @@ class HiveSessionStateBuilder(
7777
sqlParser,
7878
resourceLoader,
7979
HiveUDFExpressionBuilder,
80-
scratchSessionPath = session.scratchSessionPath)
80+
scratchSessionPath = session.scratchSessionPath,
81+
liveBus = Some(session.sparkContext.listenerBus))
8182
parentState.foreach(_.catalog.copyStateTo(catalog))
8283
catalog
8384
}

0 commit comments

Comments
 (0)