Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32001][SQL]Create JDBC authentication provider developer API #29024

Closed
wants to merge 14 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.security

import org.apache.spark.annotation.DeveloperApi

/**
* ::DeveloperApi::
* There are cases when global JVM security configuration must be modified.
* In order to avoid race the modification must be synchronized with this.
*/
@DeveloperApi
object SecurityConfigurationLock
gaborgsomogyi marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class DB2KrbIntegrationSuite extends DockerKrbJDBCIntegrationSuite {
JDBCOptions.JDBC_KEYTAB -> keytabFileName,
JDBCOptions.JDBC_PRINCIPAL -> principal
))
new DB2ConnectionProvider(null, options).getAdditionalProperties()
new DB2ConnectionProvider().getAdditionalProperties(options)
}

override def beforeContainerStart(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
org.apache.spark.sql.execution.datasources.jdbc.connection.BasicConnectionProvider
org.apache.spark.sql.execution.datasources.jdbc.connection.DB2ConnectionProvider
org.apache.spark.sql.execution.datasources.jdbc.connection.MariaDBConnectionProvider
org.apache.spark.sql.execution.datasources.jdbc.connection.MSSQLConnectionProvider
org.apache.spark.sql.execution.datasources.jdbc.connection.PostgresConnectionProvider
org.apache.spark.sql.execution.datasources.jdbc.connection.OracleConnectionProvider
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@ import java.util.{Locale, Properties}
import org.apache.commons.io.FilenameUtils

import org.apache.spark.SparkFiles
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap

/**
* ::DeveloperApi::
* Options for the JDBC data source.
*/
@DeveloperApi
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an internal class, so could you avoid exposing it? How about using Map[String, String] instead in the provider?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, this way the API is less exposed to changes.

Copy link
Contributor Author

@gaborgsomogyi gaborgsomogyi Jul 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've done the initial steps but I see bad and worse tradeoffs. The API is less dependent on JDBCOptions changes but the provider side implementation becames more complicated with either duplicated code or duplicated instantiation. Let me explain in examples:

Option1: Re-instantiate JDBCOptions where duplicated instantiation worries me + the general concern down below.

  override def canHandle(driver: Driver, options: Map[String, String]): Boolean = {
    val jdbcOptions = new JDBCOptions(options)
    jdbcOptions.keytab == null || jdbcOptions.principal == null
  }

Option2: Find out the needed parameters on by own where re-inventing the wheel feeling worries me + the general concern down below.

  override def canHandle(driver: Driver, options: Map[String, String]): Boolean = {
    val keytab = {
      val keytabParam = options.getOrElse(JDBC_KEYTAB, null)
      if (keytabParam != null && FilenameUtils.getPath(keytabParam).isEmpty) {
        val result = SparkFiles.get(keytabParam)
        logDebug(s"Keytab path not found, assuming --files, file name used on executor: $result")
        result
      } else {
        logDebug("Keytab path found, assuming manual upload")
        keytabParam
      }
    }
    val principal = options.getOrElse(JDBC_PRINCIPAL, null)
    keytab == null || principal == null
  }

General concern: Both cases Spark can be good because in the first case new JDBCOptions instantiation, in the second case copy paste moved into a base class can fill the gap. However considering myself as a 3rd-party developer I have not much options (since JDBCOptions is not exposed as developer API):

  • Copy and paste the code from JDBCOptions which may change over time
  • Implement the parameter parsing on my own which may contain bugs

Considering these findings I think it's better to keep JDBCOptions. WDYT?

Copy link
Member

@maropu maropu Jul 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Option 1 looks better. But, 3rd-party developers need to use JDBCOptions there? Or, could we just pass the two params only?

  override def canHandle(driver: Driver, keytab: String, principal: String): Boolean = {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could pass the 2 params but then we limit further implementation possibilities so I would vote on the map.
At the moment there is no need other params other than keytab and principal but later providers may need further things. It's not a strong opinion, just don't want to close later possibilities. If we agree on the way I'll do the changes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The example I've given in the other open question here can show how parameters other than keytab and principal can be used. Not passing the whole map would close this possibility.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @maropu here. JDBCOptions is under execution package and meant to be private.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon thanks for having a look!

I agree that JDBCOptions mustn't be exposed. Let me change the code to show option 1. As said passing only keytab: String, principal: String is not enough because not all but some of the providers need further configurations. I've started to work on this this change (unless anybody has better option).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've removed JDBCOptions from the API but keeping this open if further discussion needed.

class JDBCOptions(
@transient val parameters: CaseInsensitiveMap[String])
extends Serializable with Logging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ object JdbcUtils extends Logging {
throw new IllegalStateException(
s"Did not find registered driver with class $driverClass")
}
val connection = ConnectionProvider.create(driver, options).getConnection()
val connection = ConnectionProvider.create(driver, options)
require(connection != null,
s"The driver could not open a JDBC connection. Check the URL: ${options.url}")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,29 @@
package org.apache.spark.sql.execution.datasources.jdbc.connection

import java.sql.{Connection, Driver}
import java.util.Properties

import scala.collection.JavaConverters._

import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions

private[jdbc] class BasicConnectionProvider(driver: Driver, options: JDBCOptions)
extends ConnectionProvider {
def getConnection(): Connection = {
val properties = getAdditionalProperties()
private[jdbc] class BasicConnectionProvider extends JdbcConnectionProvider with Logging {
/**
* Additional properties for data connection (Data source property takes precedence).
*/
def getAdditionalProperties(options: JDBCOptions): Properties = new Properties()

override def canHandle(driver: Driver, options: JDBCOptions): Boolean = {
options.keytab == null || options.principal == null
}

override def getConnection(driver: Driver, options: JDBCOptions): Connection = {
val properties = getAdditionalProperties(options)
options.asConnectionProperties.entrySet().asScala.foreach { e =>
properties.put(e.getKey(), e.getValue())
}
logDebug(s"JDBC connection initiated with URL: ${options.url} and properties: $properties")
driver.connect(options.url, properties)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,60 +18,41 @@
package org.apache.spark.sql.execution.datasources.jdbc.connection

import java.sql.{Connection, Driver}
import java.util.Properties
import java.util.ServiceLoader

import scala.collection.mutable

import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions

/**
* Connection provider which opens connection toward various databases (database specific instance
* needed). If kerberos authentication required then it's the provider's responsibility to set all
* the parameters.
*/
private[jdbc] trait ConnectionProvider {
/**
* Additional properties for data connection (Data source property takes precedence).
*/
def getAdditionalProperties(): Properties = new Properties()

/**
* Opens connection toward the database.
*/
def getConnection(): Connection
}
import org.apache.spark.util.Utils

private[jdbc] object ConnectionProvider extends Logging {
def create(driver: Driver, options: JDBCOptions): ConnectionProvider = {
if (options.keytab == null || options.principal == null) {
logDebug("No authentication configuration found, using basic connection provider")
new BasicConnectionProvider(driver, options)
} else {
logDebug("Authentication configuration found, using database specific connection provider")
options.driverClass match {
case PostgresConnectionProvider.driverClass =>
logDebug("Postgres connection provider found")
new PostgresConnectionProvider(driver, options)

case MariaDBConnectionProvider.driverClass =>
logDebug("MariaDB connection provider found")
new MariaDBConnectionProvider(driver, options)

case DB2ConnectionProvider.driverClass =>
logDebug("DB2 connection provider found")
new DB2ConnectionProvider(driver, options)

case MSSQLConnectionProvider.driverClass =>
logDebug("MS SQL connection provider found")
new MSSQLConnectionProvider(driver, options)

case OracleConnectionProvider.driverClass =>
logDebug("Oracle connection provider found")
new OracleConnectionProvider(driver, options)

case _ =>
throw new IllegalArgumentException(s"Driver ${options.driverClass} does not support " +
"Kerberos authentication")
private val providers = loadProviders()

def loadProviders(): Seq[JdbcConnectionProvider] = {
val loader = ServiceLoader.load(classOf[JdbcConnectionProvider],
Utils.getContextOrSparkClassLoader)
val providers = mutable.ArrayBuffer[JdbcConnectionProvider]()

val iterator = loader.iterator
while (iterator.hasNext) {
try {
val provider = iterator.next
logDebug(s"Loaded built in provider: $provider")
providers += provider
} catch {
case t: Throwable =>
logError(s"Failed to load built in provider.", t)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am getting the following exception on my console permanently while running JDBC tests. Should it be really logged as an error?

14:31:25.070 ERROR org.apache.spark.sql.execution.datasources.jdbc.connection.ConnectionProvider: Failed to load built in provider.
java.util.ServiceConfigurationError: org.apache.spark.sql.jdbc.JdbcConnectionProvider: Provider org.apache.spark.sql.execution.datasources.jdbc.connection.IntentionallyFaultyConnectionProvider could not be instantiated
	at java.util.ServiceLoader.fail(ServiceLoader.java:232)
	at java.util.ServiceLoader.access$100(ServiceLoader.java:185)
	at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:384)
	at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
	at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
	at org.apache.spark.sql.execution.datasources.jdbc.connection.ConnectionProvider$.loadProviders(ConnectionProvider.scala:41)
	at org.apache.spark.sql.execution.datasources.jdbc.connection.ConnectionProvider$.<init>(ConnectionProvider.scala:31)
	at org.apache.spark.sql.execution.datasources.jdbc.connection.ConnectionProvider$.<clinit>(ConnectionProvider.scala)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$createConnectionFactory$1(JdbcUtils.scala:66)
	at org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog.withConnection(JDBCTableCatalog.scala:156)
	at org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog.listTables(JDBCTableCatalog.scala:58)
	at org.apache.spark.sql.execution.datasources.v2.ShowTablesExec.run(ShowTablesExec.scala:42)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:39)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:39)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:45)
	at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3675)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:769)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3673)
	at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229)
	at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:769)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:612)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:769)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:607)
	at org.apache.spark.sql.test.SQLTestUtilsBase.$anonfun$sql$1(SQLTestUtils.scala:231)
	at org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalogSuite.$anonfun$new$2(JDBCTableCatalogSuite.scala:67)
	at org.apache.spark.sql.QueryTest.checkAnswer(QueryTest.scala:134)
	at org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalogSuite.$anonfun$new$1(JDBCTableCatalogSuite.scala:67)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
	at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
	at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
	at org.scalatest.Transformer.apply(Transformer.scala:22)
	at org.scalatest.Transformer.apply(Transformer.scala:20)
	at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:189)
	at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:176)
	at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:187)
	at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:199)
	at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
	at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:199)
	at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:181)
	at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:61)
	at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
	at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
	at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:61)
	at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:232)
	at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
	at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
	at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
	at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:232)
	at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:231)
	at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1562)
	at org.scalatest.Suite.run(Suite.scala:1112)
	at org.scalatest.Suite.run$(Suite.scala:1094)
	at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1562)
	at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:236)
	at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
	at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:236)
	at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:235)
	at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:61)
	at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
	at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
	at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
	at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:61)
	at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
	at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1320)
	at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1314)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1314)
	at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:993)
	at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:971)
	at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1480)
	at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:971)
	at org.scalatest.tools.Runner$.run(Runner.scala:798)
	at org.scalatest.tools.Runner.run(Runner.scala)
	at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2or3(ScalaTestRunner.java:40)
	at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:27)
Caused by: java.lang.IllegalArgumentException: Intentional Exception
	at org.apache.spark.sql.execution.datasources.jdbc.connection.IntentionallyFaultyConnectionProvider.<init>(IntentionallyFaultyConnectionProvider.scala:26)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at java.lang.Class.newInstance(Class.java:442)
	at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:380)
	... 82 more

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe log it as a warning?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can decrease it to warning. The main message is to notify the user.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it okay to ignore the error case where it fails to load builtin providers? DataSource throws an exception if it fails to load builtin datasources:

case e: NoClassDefFoundError => // This one won't be caught by Scala NonFatal
// NoClassDefFoundError's class name uses "/" rather than "." for packages
val className = e.getMessage.replaceAll("/", ".")
if (spark2RemovedClasses.contains(className)) {
throw new ClassNotFoundException(s"$className was removed in Spark 2.0. " +
"Please check if your library is compatible with Spark 2.0", e)
} else {
throw e
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you mean ignore? Providers must be loaded independently so we need to catch and ignore the exception.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the suggestion is let the exception fire then I would say it's bad idea. If a provider is not able to be loaded then the rest must go. We've had similar issue and expectation in hadoop delegation token area.

Copy link
Member

@maropu maropu Oct 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, the policy looks okay for secure connections, but how about the basic one, BasicConnectionProvider? At least, until Spark v3.0, creating basic JDBC connections does not fail because of the loading failure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At least, until Spark v3.0, creating basic JDBC connections does not fail because of the loading failure.

Here it's the same since BasicConnectionProvider is built-in and no pre-load inside. The main issue would come when one adds a new provider which unable to be loaded. That failure would make all the rest workload fail if we don't load them independently.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
}
providers
}

def create(driver: Driver, options: JDBCOptions): Connection = {
val filteredProviders = providers.filter(_.canHandle(driver, options))
logDebug(s"Filtered providers: $filteredProviders")
require(filteredProviders.size == 1,
"JDBC connection initiated but not exactly one connection provider found which can handle it")
gaborgsomogyi marked this conversation as resolved.
Show resolved Hide resolved
filteredProviders.head.getConnection(driver, options)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,39 +23,39 @@ import java.util.Properties

import org.apache.hadoop.security.UserGroupInformation

import org.apache.spark.security.SecurityConfigurationLock
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions

private[sql] class DB2ConnectionProvider(driver: Driver, options: JDBCOptions)
extends SecureConnectionProvider(driver, options) {
override val appEntry: String = "JaasClient"
private[sql] class DB2ConnectionProvider extends SecureConnectionProvider {
override val driverClass = "com.ibm.db2.jcc.DB2Driver"

override def getConnection(): Connection = {
setAuthenticationConfigIfNeeded()
override def appEntry(driver: Driver, options: JDBCOptions): String = "JaasClient"

override def getConnection(driver: Driver, options: JDBCOptions): Connection = {
setAuthenticationConfigIfNeeded(driver, options)
UserGroupInformation.loginUserFromKeytabAndReturnUGI(options.principal, options.keytab).doAs(
new PrivilegedExceptionAction[Connection]() {
override def run(): Connection = {
DB2ConnectionProvider.super.getConnection()
DB2ConnectionProvider.super.getConnection(driver, options)
}
}
)
}

override def getAdditionalProperties(): Properties = {
override def getAdditionalProperties(options: JDBCOptions): Properties = {
val result = new Properties()
// 11 is the integer value for kerberos
result.put("securityMechanism", new String("11"))
result.put("KerberosServerPrincipal", options.principal)
result
}

override def setAuthenticationConfigIfNeeded(): Unit = SecurityConfigurationLock.synchronized {
val (parent, configEntry) = getConfigWithAppEntry()
override def setAuthenticationConfigIfNeeded(
driver: Driver,
options: JDBCOptions): Unit = SecurityConfigurationLock.synchronized {
val (parent, configEntry) = getConfigWithAppEntry(driver, options)
if (configEntry == null || configEntry.isEmpty) {
setAuthenticationConfig(parent)
setAuthenticationConfig(parent, driver, options)
}
}
}

private[sql] object DB2ConnectionProvider {
val driverClass = "com.ibm.db2.jcc.DB2Driver"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.jdbc.connection
gaborgsomogyi marked this conversation as resolved.
Show resolved Hide resolved

import java.sql.{Connection, Driver}

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions

/**
* ::DeveloperApi::
* Connection provider which opens connection toward various databases (database specific instance
* needed). If any authentication required then it's the provider's responsibility to set all
* the parameters. If global JVM security configuration is changed then
* <code>SecurityConfigurationLock</code> must be used as lock to avoid race.
gaborgsomogyi marked this conversation as resolved.
Show resolved Hide resolved
* Important to mention connection providers within a JVM used from multiple threads so adding
* internal state is not advised. If any state added then it must be synchronized properly.
gaborgsomogyi marked this conversation as resolved.
Show resolved Hide resolved
*/
@DeveloperApi
gaborgsomogyi marked this conversation as resolved.
Show resolved Hide resolved
trait JdbcConnectionProvider {
/**
* Checks if this connection provider instance can handle the connection initiated by the driver.
* There must be exactly one active connection provider which can handle the connection for a
* specific driver. If this requirement doesn't met then <code>IllegalArgumentException</code>
* will be thrown by the provider framework.
* @param driver Java driver which initiates the connection
* @param options Driver options which initiates the connection
* @return True if the connection provider can handle the driver with the given options.
*/
def canHandle(driver: Driver, options: JDBCOptions): Boolean

/**
* Opens connection toward the database.
* @param driver Java driver which initiates the connection
* @param options Driver options which initiates the connection
* @return a <code>Connection</code> object that represents a connection to the URL
*/
def getConnection(driver: Driver, options: JDBCOptions): Connection
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ import java.util.Properties

import org.apache.hadoop.security.UserGroupInformation

import org.apache.spark.security.SecurityConfigurationLock
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions

private[sql] class MSSQLConnectionProvider(
driver: Driver,
options: JDBCOptions,
parserMethod: String = "parseAndMergeProperties"
) extends SecureConnectionProvider(driver, options) {
override val appEntry: String = {
private[sql] class MSSQLConnectionProvider extends SecureConnectionProvider {
override val driverClass = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
val parserMethod: String = "parseAndMergeProperties"

override def appEntry(driver: Driver, options: JDBCOptions): String = {
val configName = "jaasConfigurationName"
val appEntryDefault = "SQLJDBCDriver"

Expand Down Expand Up @@ -58,27 +58,29 @@ private[sql] class MSSQLConnectionProvider(
}
}

override def getConnection(): Connection = {
setAuthenticationConfigIfNeeded()
override def getConnection(driver: Driver, options: JDBCOptions): Connection = {
setAuthenticationConfigIfNeeded(driver, options)
UserGroupInformation.loginUserFromKeytabAndReturnUGI(options.principal, options.keytab).doAs(
new PrivilegedExceptionAction[Connection]() {
override def run(): Connection = {
MSSQLConnectionProvider.super.getConnection()
MSSQLConnectionProvider.super.getConnection(driver, options)
}
}
)
}

override def getAdditionalProperties(): Properties = {
override def getAdditionalProperties(options: JDBCOptions): Properties = {
val result = new Properties()
// These props needed to reach internal kerberos authentication in the JDBC driver
result.put("integratedSecurity", "true")
result.put("authenticationScheme", "JavaKerberos")
result
}

override def setAuthenticationConfigIfNeeded(): Unit = SecurityConfigurationLock.synchronized {
val (parent, configEntry) = getConfigWithAppEntry()
override def setAuthenticationConfigIfNeeded(
driver: Driver,
options: JDBCOptions): Unit = SecurityConfigurationLock.synchronized {
val (parent, configEntry) = getConfigWithAppEntry(driver, options)
/**
* Couple of things to mention here (v8.2.2 client):
* 1. MS SQL supports JAAS application name configuration
Expand All @@ -87,11 +89,7 @@ private[sql] class MSSQLConnectionProvider(
val entryUsesKeytab = configEntry != null &&
configEntry.exists(_.getOptions().get("useKeyTab") == "true")
if (configEntry == null || configEntry.isEmpty || !entryUsesKeytab) {
setAuthenticationConfig(parent)
setAuthenticationConfig(parent, driver, options)
}
}
}

private[sql] object MSSQLConnectionProvider {
val driverClass = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}
Loading