Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32001][SQL]Create JDBC authentication provider developer API #29024

Closed
wants to merge 14 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.security

/**
* There are cases when global JVM security configuration must be modified.
* In order to avoid race the modification must be synchronized with this.
*/
object SecurityConfigurationLock
gaborgsomogyi marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class DB2KrbIntegrationSuite extends DockerKrbJDBCIntegrationSuite {
JDBCOptions.JDBC_KEYTAB -> keytabFileName,
JDBCOptions.JDBC_PRINCIPAL -> principal
))
new DB2ConnectionProvider(null, options).getAdditionalProperties()
new DB2ConnectionProvider().getAdditionalProperties(options)
}

override def beforeContainerStart(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
org.apache.spark.sql.execution.datasources.jdbc.connection.BasicConnectionProvider
org.apache.spark.sql.execution.datasources.jdbc.connection.DB2ConnectionProvider
org.apache.spark.sql.execution.datasources.jdbc.connection.MariaDBConnectionProvider
org.apache.spark.sql.execution.datasources.jdbc.connection.MSSQLConnectionProvider
org.apache.spark.sql.execution.datasources.jdbc.connection.PostgresConnectionProvider
org.apache.spark.sql.execution.datasources.jdbc.connection.OracleConnectionProvider
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
* Options for the JDBC data source.
*/
class JDBCOptions(
@transient val parameters: CaseInsensitiveMap[String])
val parameters: CaseInsensitiveMap[String])
extends Serializable with Logging {

import JDBCOptions._
Expand Down Expand Up @@ -209,7 +209,7 @@ class JDBCOptions(
}

class JdbcOptionsInWrite(
@transient override val parameters: CaseInsensitiveMap[String])
override val parameters: CaseInsensitiveMap[String])
extends JDBCOptions(parameters) {

import JDBCOptions._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ object JdbcUtils extends Logging {
throw new IllegalStateException(
s"Did not find registered driver with class $driverClass")
}
val connection = ConnectionProvider.create(driver, options).getConnection()
val connection = ConnectionProvider.create(driver, options.parameters)
require(connection != null,
s"The driver could not open a JDBC connection. Check the URL: ${options.url}")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,30 @@
package org.apache.spark.sql.execution.datasources.jdbc.connection

import java.sql.{Connection, Driver}
import java.util.Properties

import scala.collection.JavaConverters._

import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
import org.apache.spark.sql.jdbc.JdbcConnectionProvider

private[jdbc] class BasicConnectionProvider extends JdbcConnectionProvider with Logging {
/**
* Additional properties for data connection (Data source property takes precedence).
*/
def getAdditionalProperties(options: JDBCOptions): Properties = new Properties()

override def canHandle(driver: Driver, options: Map[String, String]): Boolean = {
val jdbcOptions = new JDBCOptions(options)
jdbcOptions.keytab == null || jdbcOptions.principal == null
}

private[jdbc] class BasicConnectionProvider(driver: Driver, options: JDBCOptions)
extends ConnectionProvider {
def getConnection(): Connection = {
val properties = getAdditionalProperties()
options.asConnectionProperties.entrySet().asScala.foreach { e =>
properties.put(e.getKey(), e.getValue())
override def getConnection(driver: Driver, options: Map[String, String]): Connection = {
val jdbcOptions = new JDBCOptions(options)
val properties = getAdditionalProperties(jdbcOptions)
options.foreach { case(k, v) =>
properties.put(k, v)
}
driver.connect(options.url, properties)
logDebug(s"JDBC connection initiated with URL: ${jdbcOptions.url} and properties: $properties")
driver.connect(jdbcOptions.url, properties)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,60 +18,45 @@
package org.apache.spark.sql.execution.datasources.jdbc.connection

import java.sql.{Connection, Driver}
import java.util.Properties
import java.util.ServiceLoader

import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions

/**
* Connection provider which opens connection toward various databases (database specific instance
* needed). If kerberos authentication required then it's the provider's responsibility to set all
* the parameters.
*/
private[jdbc] trait ConnectionProvider {
/**
* Additional properties for data connection (Data source property takes precedence).
*/
def getAdditionalProperties(): Properties = new Properties()
import scala.collection.mutable

/**
* Opens connection toward the database.
*/
def getConnection(): Connection
}
import org.apache.spark.internal.Logging
import org.apache.spark.security.SecurityConfigurationLock
import org.apache.spark.sql.jdbc.JdbcConnectionProvider
import org.apache.spark.util.Utils

private[jdbc] object ConnectionProvider extends Logging {
def create(driver: Driver, options: JDBCOptions): ConnectionProvider = {
if (options.keytab == null || options.principal == null) {
logDebug("No authentication configuration found, using basic connection provider")
new BasicConnectionProvider(driver, options)
} else {
logDebug("Authentication configuration found, using database specific connection provider")
options.driverClass match {
case PostgresConnectionProvider.driverClass =>
logDebug("Postgres connection provider found")
new PostgresConnectionProvider(driver, options)

case MariaDBConnectionProvider.driverClass =>
logDebug("MariaDB connection provider found")
new MariaDBConnectionProvider(driver, options)

case DB2ConnectionProvider.driverClass =>
logDebug("DB2 connection provider found")
new DB2ConnectionProvider(driver, options)

case MSSQLConnectionProvider.driverClass =>
logDebug("MS SQL connection provider found")
new MSSQLConnectionProvider(driver, options)

case OracleConnectionProvider.driverClass =>
logDebug("Oracle connection provider found")
new OracleConnectionProvider(driver, options)

case _ =>
throw new IllegalArgumentException(s"Driver ${options.driverClass} does not support " +
"Kerberos authentication")
private val providers = loadProviders()

def loadProviders(): Seq[JdbcConnectionProvider] = {
val loader = ServiceLoader.load(classOf[JdbcConnectionProvider],
Utils.getContextOrSparkClassLoader)
val providers = mutable.ArrayBuffer[JdbcConnectionProvider]()

val iterator = loader.iterator
while (iterator.hasNext) {
try {
val provider = iterator.next
logDebug(s"Loaded built in provider: $provider")
providers += provider
} catch {
case t: Throwable =>
logError(s"Failed to load built in provider.", t)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am getting the following exception on my console permanently while running JDBC tests. Should it be really logged as an error?

14:31:25.070 ERROR org.apache.spark.sql.execution.datasources.jdbc.connection.ConnectionProvider: Failed to load built in provider.
java.util.ServiceConfigurationError: org.apache.spark.sql.jdbc.JdbcConnectionProvider: Provider org.apache.spark.sql.execution.datasources.jdbc.connection.IntentionallyFaultyConnectionProvider could not be instantiated
	at java.util.ServiceLoader.fail(ServiceLoader.java:232)
	at java.util.ServiceLoader.access$100(ServiceLoader.java:185)
	at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:384)
	at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
	at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
	at org.apache.spark.sql.execution.datasources.jdbc.connection.ConnectionProvider$.loadProviders(ConnectionProvider.scala:41)
	at org.apache.spark.sql.execution.datasources.jdbc.connection.ConnectionProvider$.<init>(ConnectionProvider.scala:31)
	at org.apache.spark.sql.execution.datasources.jdbc.connection.ConnectionProvider$.<clinit>(ConnectionProvider.scala)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$createConnectionFactory$1(JdbcUtils.scala:66)
	at org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog.withConnection(JDBCTableCatalog.scala:156)
	at org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog.listTables(JDBCTableCatalog.scala:58)
	at org.apache.spark.sql.execution.datasources.v2.ShowTablesExec.run(ShowTablesExec.scala:42)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:39)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:39)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:45)
	at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3675)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:769)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3673)
	at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229)
	at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:769)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:612)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:769)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:607)
	at org.apache.spark.sql.test.SQLTestUtilsBase.$anonfun$sql$1(SQLTestUtils.scala:231)
	at org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalogSuite.$anonfun$new$2(JDBCTableCatalogSuite.scala:67)
	at org.apache.spark.sql.QueryTest.checkAnswer(QueryTest.scala:134)
	at org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalogSuite.$anonfun$new$1(JDBCTableCatalogSuite.scala:67)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
	at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
	at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
	at org.scalatest.Transformer.apply(Transformer.scala:22)
	at org.scalatest.Transformer.apply(Transformer.scala:20)
	at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:189)
	at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:176)
	at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:187)
	at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:199)
	at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
	at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:199)
	at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:181)
	at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:61)
	at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
	at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
	at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:61)
	at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:232)
	at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
	at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
	at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
	at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:232)
	at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:231)
	at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1562)
	at org.scalatest.Suite.run(Suite.scala:1112)
	at org.scalatest.Suite.run$(Suite.scala:1094)
	at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1562)
	at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:236)
	at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
	at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:236)
	at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:235)
	at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:61)
	at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
	at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
	at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
	at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:61)
	at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
	at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1320)
	at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1314)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1314)
	at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:993)
	at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:971)
	at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1480)
	at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:971)
	at org.scalatest.tools.Runner$.run(Runner.scala:798)
	at org.scalatest.tools.Runner.run(Runner.scala)
	at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2or3(ScalaTestRunner.java:40)
	at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:27)
Caused by: java.lang.IllegalArgumentException: Intentional Exception
	at org.apache.spark.sql.execution.datasources.jdbc.connection.IntentionallyFaultyConnectionProvider.<init>(IntentionallyFaultyConnectionProvider.scala:26)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at java.lang.Class.newInstance(Class.java:442)
	at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:380)
	... 82 more

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe log it as a warning?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can decrease it to warning. The main message is to notify the user.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it okay to ignore the error case where it fails to load builtin providers? DataSource throws an exception if it fails to load builtin datasources:

case e: NoClassDefFoundError => // This one won't be caught by Scala NonFatal
// NoClassDefFoundError's class name uses "/" rather than "." for packages
val className = e.getMessage.replaceAll("/", ".")
if (spark2RemovedClasses.contains(className)) {
throw new ClassNotFoundException(s"$className was removed in Spark 2.0. " +
"Please check if your library is compatible with Spark 2.0", e)
} else {
throw e
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you mean ignore? Providers must be loaded independently so we need to catch and ignore the exception.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the suggestion is let the exception fire then I would say it's bad idea. If a provider is not able to be loaded then the rest must go. We've had similar issue and expectation in hadoop delegation token area.

Copy link
Member

@maropu maropu Oct 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, the policy looks okay for secure connections, but how about the basic one, BasicConnectionProvider? At least, until Spark v3.0, creating basic JDBC connections does not fail because of the loading failure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At least, until Spark v3.0, creating basic JDBC connections does not fail because of the loading failure.

Here it's the same since BasicConnectionProvider is built-in and no pre-load inside. The main issue would come when one adds a new provider which unable to be loaded. That failure would make all the rest workload fail if we don't load them independently.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
}
// Seems duplicate but it's needed for Scala 2.13
providers.toSeq
}

def create(driver: Driver, options: Map[String, String]): Connection = {
val filteredProviders = providers.filter(_.canHandle(driver, options))
require(filteredProviders.size == 1,
"JDBC connection initiated but not exactly one connection provider found which can handle " +
s"it. Found active providers: ${filteredProviders.mkString(", ")}")
SecurityConfigurationLock.synchronized {
Copy link
Contributor

@tdg5 tdg5 Nov 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😭

I believe this synchronization introduces a significant performance bottleneck for applications that rely on being able to establish many connections simultaneously for performance reasons. This change forces concurrency to 1 when establishing database connections for a given JDBC driver and that strikes me as a significant user impacting change.

Can anyone propose a workaround for this? I have an app that makes connections to thousands of databases and I can't upgrade to any version >=3.1.x because of this significant bottleneck.

https://issues.apache.org/jira/browse/SPARK-37391

so-much-blocking

That screenshot only shows a handful of blocked threads, but in total I have 98 threads blocked waiting for the SecurityConfigurationLock.

I do not have this issue when running Spark 3.0.1 with no code changes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gaborgsomogyi is getConnection() not thread-safe?
Or is there any possibility of finer-grained locking on the actual object that getConnection() is called on?
We could move the filteredProviders.head out of the block but doubt that does anything.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is getConnection() not thread-safe?

Simply no.

This is well thought initially even if it introduces bottlenecks. The main problem is that JVM has a single security context. In order to make a connection one MUST modify the security context, otherwise the JDBC connector is not able to provide the security credentials (TGT, keytab or whatever). Simply saying JDBC connectors are able to get credentials from the single security context.

Since multiple connections are made concurrently (sometimes same DB type w/ different credentials) this must be synchronized not to have a malformed security context (we've made load test and added 10+ DBs and corrupted the security context pretty fast w/o sync and it was horror to find out what's going on).
Workload are coming and going, nobody can tell in advance what kind of security context need to be created at the very beginning of JVM creation. If we would pre-bake the security context (not sure how?!) then the following issues would come:

  • How to handle 2 same type of databases (for example MariaDB) with 2 different credentials?
  • How to handle if a DB login credentials changed over time?

I personally think the first one can't be solved, the second one could be cured w/ all JVM restarts but I think it's just not user friendly.

If somebody has an excellent idea I would like to hear it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I can imagine is to pre-set the JVM JAAS config for databases from file like this: java -Djava.security.auth.login.config=jaas.conf.
And then a new flag can be introduced like: spark.jdbc.doNotSyncronize which is default true.
That case security credentials MUSTN'T be provided by Spark's JDBC properties but only from JAAS file.
However this would be super risky and for advanced users only. That said I've spent at least a month to debug the JVM security context what's going on...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know anything about the underlying problem, but it had crossed my mind that allowing the synchronization to be optional could be one path forward.

@gaborgsomogyi may I trouble you for some references related to

The main problem is that JVM has a single security context. In order to make a connection one MUST modify the security context, otherwise the JDBC connector is not able to provide the security credentials (TGT, keytab or whatever)

so I can better familiarize myself with the problem that you are describing?

Beyond this particular issue, what you've shared suggests that the concurrency utilized by my app could be causing us to crosswire data which would be a major problem.

I guess I'd also ask, is there more to it than you described? It sounds like I should either have some cross wired data or if that's not the case then there is some missing piece of the puzzle that means the synchronization is not always required.

Thanks in advance!

Copy link
Contributor Author

@gaborgsomogyi gaborgsomogyi Nov 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless somebody has a ground breaking idea providers must be synced where security enabled, we can free up some time when no authentication is in place.

Copy link
Member

@HyukjinKwon HyukjinKwon Nov 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tdg5 or @maropu would you be able to find some time for fixing this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The proposed solution seems straightforward to me and I'm happy to take a stab at it. Dumb question though, how would I eventually tweak the spark class loader so it would detect a custom JdbcConnectionProvider in my library code?

I tried baking such a class into the uberjar containing the driver app, but I didn't seem to be able to get ConnectionProvider to notice my clone of BasicConnectionProvider. I was guessing this was due to a class loader disagreement.

My spark-submit looks roughly like so:

spark-submit \
  --class theThing \
  --master 'local[*]' \
  --driver-memory 2G \
  --driver-class-path theUberjar.jar
  theUberjar.jar

Copy link
Contributor Author

@gaborgsomogyi gaborgsomogyi Nov 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tdg5 Thanks for lending a helping hand! For such scenarios I've created a readme here.
Please don't forget to update this doc accordingly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gaborgsomogyi thanks! The META-INF manifest is what I was missing 👍🏻

Here's my stab at what you suggested: #34745

I marked it a WIP, but it is complete as far as I am concerned and aware. I just want to make sure it matches up with what you had in mind before I open it up to broader review.

filteredProviders.head.getConnection(driver, options)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,37 +25,36 @@ import org.apache.hadoop.security.UserGroupInformation

import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions

private[sql] class DB2ConnectionProvider(driver: Driver, options: JDBCOptions)
extends SecureConnectionProvider(driver, options) {
override val appEntry: String = "JaasClient"

override def getConnection(): Connection = {
setAuthenticationConfigIfNeeded()
UserGroupInformation.loginUserFromKeytabAndReturnUGI(options.principal, options.keytab).doAs(
new PrivilegedExceptionAction[Connection]() {
override def run(): Connection = {
DB2ConnectionProvider.super.getConnection()
private[sql] class DB2ConnectionProvider extends SecureConnectionProvider {
override val driverClass = "com.ibm.db2.jcc.DB2Driver"

override def appEntry(driver: Driver, options: JDBCOptions): String = "JaasClient"

override def getConnection(driver: Driver, options: Map[String, String]): Connection = {
val jdbcOptions = new JDBCOptions(options)
setAuthenticationConfigIfNeeded(driver, jdbcOptions)
UserGroupInformation.loginUserFromKeytabAndReturnUGI(jdbcOptions.principal, jdbcOptions.keytab)
.doAs(
new PrivilegedExceptionAction[Connection]() {
override def run(): Connection = {
DB2ConnectionProvider.super.getConnection(driver, options)
}
}
}
)
)
}

override def getAdditionalProperties(): Properties = {
override def getAdditionalProperties(options: JDBCOptions): Properties = {
val result = new Properties()
// 11 is the integer value for kerberos
result.put("securityMechanism", new String("11"))
result.put("KerberosServerPrincipal", options.principal)
result
}

override def setAuthenticationConfigIfNeeded(): Unit = SecurityConfigurationLock.synchronized {
val (parent, configEntry) = getConfigWithAppEntry()
override def setAuthenticationConfigIfNeeded(driver: Driver, options: JDBCOptions): Unit = {
val (parent, configEntry) = getConfigWithAppEntry(driver, options)
if (configEntry == null || configEntry.isEmpty) {
setAuthenticationConfig(parent)
setAuthenticationConfig(parent, driver, options)
}
}
}

private[sql] object DB2ConnectionProvider {
val driverClass = "com.ibm.db2.jcc.DB2Driver"
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,11 @@ import org.apache.hadoop.security.UserGroupInformation

import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions

private[sql] class MSSQLConnectionProvider(
driver: Driver,
options: JDBCOptions,
parserMethod: String = "parseAndMergeProperties"
) extends SecureConnectionProvider(driver, options) {
override val appEntry: String = {
private[sql] class MSSQLConnectionProvider extends SecureConnectionProvider {
override val driverClass = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
val parserMethod: String = "parseAndMergeProperties"

override def appEntry(driver: Driver, options: JDBCOptions): String = {
val configName = "jaasConfigurationName"
val appEntryDefault = "SQLJDBCDriver"

Expand Down Expand Up @@ -58,27 +57,29 @@ private[sql] class MSSQLConnectionProvider(
}
}

override def getConnection(): Connection = {
setAuthenticationConfigIfNeeded()
UserGroupInformation.loginUserFromKeytabAndReturnUGI(options.principal, options.keytab).doAs(
new PrivilegedExceptionAction[Connection]() {
override def run(): Connection = {
MSSQLConnectionProvider.super.getConnection()
override def getConnection(driver: Driver, options: Map[String, String]): Connection = {
val jdbcOptions = new JDBCOptions(options)
setAuthenticationConfigIfNeeded(driver, jdbcOptions)
UserGroupInformation.loginUserFromKeytabAndReturnUGI(jdbcOptions.principal, jdbcOptions.keytab)
.doAs(
new PrivilegedExceptionAction[Connection]() {
override def run(): Connection = {
MSSQLConnectionProvider.super.getConnection(driver, options)
}
}
}
)
)
}

override def getAdditionalProperties(): Properties = {
override def getAdditionalProperties(options: JDBCOptions): Properties = {
val result = new Properties()
// These props needed to reach internal kerberos authentication in the JDBC driver
result.put("integratedSecurity", "true")
result.put("authenticationScheme", "JavaKerberos")
result
}

override def setAuthenticationConfigIfNeeded(): Unit = SecurityConfigurationLock.synchronized {
val (parent, configEntry) = getConfigWithAppEntry()
override def setAuthenticationConfigIfNeeded(driver: Driver, options: JDBCOptions): Unit = {
val (parent, configEntry) = getConfigWithAppEntry(driver, options)
/**
* Couple of things to mention here (v8.2.2 client):
* 1. MS SQL supports JAAS application name configuration
Expand All @@ -87,11 +88,7 @@ private[sql] class MSSQLConnectionProvider(
val entryUsesKeytab = configEntry != null &&
configEntry.exists(_.getOptions().get("useKeyTab") == "true")
if (configEntry == null || configEntry.isEmpty || !entryUsesKeytab) {
setAuthenticationConfig(parent)
setAuthenticationConfig(parent, driver, options)
}
}
}

private[sql] object MSSQLConnectionProvider {
val driverClass = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ import java.sql.Driver

import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions

private[jdbc] class MariaDBConnectionProvider(driver: Driver, options: JDBCOptions)
extends SecureConnectionProvider(driver, options) {
override val appEntry: String = {
private[jdbc] class MariaDBConnectionProvider extends SecureConnectionProvider {
override val driverClass = "org.mariadb.jdbc.Driver"

override def appEntry(driver: Driver, options: JDBCOptions): String =
"Krb5ConnectorContext"
}

override def setAuthenticationConfigIfNeeded(): Unit = SecurityConfigurationLock.synchronized {
val (parent, configEntry) = getConfigWithAppEntry()
override def setAuthenticationConfigIfNeeded(driver: Driver, options: JDBCOptions): Unit = {
val (parent, configEntry) = getConfigWithAppEntry(driver, options)
/**
* Couple of things to mention here (v2.5.4 client):
* 1. MariaDB doesn't support JAAS application name configuration
Expand All @@ -37,11 +37,7 @@ private[jdbc] class MariaDBConnectionProvider(driver: Driver, options: JDBCOptio
val entryUsesKeytab = configEntry != null &&
configEntry.exists(_.getOptions().get("useKeyTab") == "true")
if (configEntry == null || configEntry.isEmpty || !entryUsesKeytab) {
setAuthenticationConfig(parent)
setAuthenticationConfig(parent, driver, options)
}
}
}

private[sql] object MariaDBConnectionProvider {
val driverClass = "org.mariadb.jdbc.Driver"
}
Loading