@@ -563,24 +563,16 @@ private[kafka010] object KafkaSourceProvider extends Logging {
563
563
// For this no additional parameter needed.
564
564
// - Token is provided -> try to log in with scram module using kafka's dynamic JAAS
565
565
// configuration.
566
- // - Token not provided -> try to log in with kerberos module and ticket cache using kafka's
567
- // dynamic JAAS configuration.
568
566
if (KafkaTokenUtil .isGlobalJaasConfigurationProvided) {
569
567
logDebug(" JVM global security configuration detected, using it for login." )
570
- } else {
571
- if (KafkaSecurityHelper .isTokenAvailable()) {
572
- logDebug(" Delegation token detected, using it for login." )
573
- val jaasParams = KafkaSecurityHelper .getTokenJaasParams(SparkEnv .get.conf)
574
- val mechanism = kafkaParams
575
- .getOrElse(SaslConfigs .SASL_MECHANISM , SaslConfigs .DEFAULT_SASL_MECHANISM )
576
- require(mechanism.startsWith(" SCRAM" ),
577
- " Delegation token works only with SCRAM mechanism." )
578
- set(SaslConfigs .SASL_JAAS_CONFIG , jaasParams)
579
- } else {
580
- logDebug(" Using ticket cache for login." )
581
- val jaasParams = KafkaTokenUtil .getTicketCacheJaasParams(SparkEnv .get.conf)
582
- set(SaslConfigs .SASL_JAAS_CONFIG , jaasParams)
583
- }
568
+ } else if (KafkaSecurityHelper .isTokenAvailable()) {
569
+ logDebug(" Delegation token detected, using it for login." )
570
+ val jaasParams = KafkaSecurityHelper .getTokenJaasParams(SparkEnv .get.conf)
571
+ val mechanism = kafkaParams
572
+ .getOrElse(SaslConfigs .SASL_MECHANISM , SaslConfigs .DEFAULT_SASL_MECHANISM )
573
+ require(mechanism.startsWith(" SCRAM" ),
574
+ " Delegation token works only with SCRAM mechanism." )
575
+ set(SaslConfigs .SASL_JAAS_CONFIG , jaasParams)
584
576
}
585
577
this
586
578
}
0 commit comments