Skip to content

Commit 7c22433

Browse files
committed
Review fixes:
* Reordered TokenUtil methods to make it more readable. * Enhanced additional parameters doc to make things more explicit.
1 parent fffd139 commit 7c22433

File tree

2 files changed

+33
-32
lines changed
  • core/src/main/scala/org/apache/spark/internal/config
  • external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010

2 files changed

+33
-32
lines changed

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -651,37 +651,38 @@ package object config {
651651
private[spark] val KAFKA_BOOTSTRAP_SERVERS =
652652
ConfigBuilder("spark.kafka.bootstrap.servers")
653653
.doc("A list of coma separated host/port pairs to use for establishing the initial " +
654-
"connection to the Kafka cluster. For further details please see kafka documentation.")
654+
"connection to the Kafka cluster. For further details please see kafka documentation. " +
655+
"Only used to obtain delegation token.")
655656
.stringConf
656657
.createOptional
657658

658659
private[spark] val KAFKA_SECURITY_PROTOCOL =
659660
ConfigBuilder("spark.kafka.security.protocol")
660661
.doc("Protocol used to communicate with brokers. For further details please see kafka " +
661-
"documentation.")
662+
"documentation. Only used to obtain delegation token.")
662663
.stringConf
663664
.createWithDefault("SASL_SSL")
664665

665666
private[spark] val KAFKA_KERBEROS_SERVICE_NAME =
666667
ConfigBuilder("spark.kafka.sasl.kerberos.service.name")
667668
.doc("The Kerberos principal name that Kafka runs as. This can be defined either in " +
668669
"Kafka's JAAS config or in Kafka's config. For further details please see kafka " +
669-
"documentation.")
670+
"documentation. Only used to obtain delegation token.")
670671
.stringConf
671672
.createOptional
672673

673674
private[spark] val KAFKA_TRUSTSTORE_LOCATION =
674675
ConfigBuilder("spark.kafka.ssl.truststore.location")
675676
.doc("The location of the trust store file. For further details please see kafka " +
676-
"documentation.")
677+
"documentation. Only used to obtain delegation token.")
677678
.stringConf
678679
.createOptional
679680

680681
private[spark] val KAFKA_TRUSTSTORE_PASSWORD =
681682
ConfigBuilder("spark.kafka.ssl.truststore.password")
682683
.doc("The store password for the key store file. This is optional for client and only " +
683684
"needed if ssl.keystore.location is configured. For further details please see kafka " +
684-
"documentation.")
685+
"documentation. Only used to obtain delegation token.")
685686
.stringConf
686687
.createOptional
687688
}

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/TokenUtil.scala

Lines changed: 27 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -40,20 +40,19 @@ private[kafka010] object TokenUtil extends Logging {
4040
override def getKind: Text = TOKEN_KIND;
4141
}
4242

43-
private def printToken(token: DelegationToken): Unit = {
44-
if (log.isDebugEnabled) {
45-
val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm")
46-
logDebug("%-15s %-30s %-15s %-25s %-15s %-15s %-15s".format(
47-
"TOKENID", "HMAC", "OWNER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", "MAXDATE"))
48-
val tokenInfo = token.tokenInfo
49-
logDebug("%-15s [hidden] %-15s %-25s %-15s %-15s %-15s".format(
50-
tokenInfo.tokenId,
51-
tokenInfo.owner,
52-
tokenInfo.renewersAsString,
53-
dateFormat.format(tokenInfo.issueTimestamp),
54-
dateFormat.format(tokenInfo.expiryTimestamp),
55-
dateFormat.format(tokenInfo.maxTimestamp)))
56-
}
43+
def obtainToken(sparkConf: SparkConf): (Token[_ <: TokenIdentifier], Long) = {
44+
val adminClient = AdminClient.create(createAdminClientProperties(sparkConf))
45+
val createDelegationTokenOptions = new CreateDelegationTokenOptions()
46+
val createResult = adminClient.createDelegationToken(createDelegationTokenOptions)
47+
val token = createResult.delegationToken().get()
48+
printToken(token)
49+
50+
new Token[KafkaDelegationTokenIdentifier](
51+
token.tokenInfo.tokenId.getBytes,
52+
token.hmacAsBase64String.getBytes,
53+
TOKEN_KIND,
54+
TOKEN_SERVICE
55+
)
5756
}
5857

5958
private[kafka010] def createAdminClientProperties(sparkConf: SparkConf): Properties = {
@@ -94,18 +93,19 @@ private[kafka010] object TokenUtil extends Logging {
9493
adminClientProperties
9594
}
9695

97-
def obtainToken(sparkConf: SparkConf): Token[_ <: TokenIdentifier] = {
98-
val adminClient = AdminClient.create(createAdminClientProperties(sparkConf))
99-
val createDelegationTokenOptions = new CreateDelegationTokenOptions()
100-
val createResult = adminClient.createDelegationToken(createDelegationTokenOptions)
101-
val token = createResult.delegationToken().get()
102-
printToken(token)
103-
104-
new Token[KafkaDelegationTokenIdentifier](
105-
token.tokenInfo.tokenId.getBytes,
106-
token.hmacAsBase64String.getBytes,
107-
TOKEN_KIND,
108-
TOKEN_SERVICE
109-
)
96+
private def printToken(token: DelegationToken): Unit = {
97+
if (log.isDebugEnabled) {
98+
val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm")
99+
logDebug("%-15s %-30s %-15s %-25s %-15s %-15s %-15s".format(
100+
"TOKENID", "HMAC", "OWNER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", "MAXDATE"))
101+
val tokenInfo = token.tokenInfo
102+
logDebug("%-15s [hidden] %-15s %-25s %-15s %-15s %-15s".format(
103+
tokenInfo.tokenId,
104+
tokenInfo.owner,
105+
tokenInfo.renewersAsString,
106+
dateFormat.format(tokenInfo.issueTimestamp),
107+
dateFormat.format(tokenInfo.expiryTimestamp),
108+
dateFormat.format(tokenInfo.maxTimestamp)))
109+
}
110110
}
111111
}

0 commit comments

Comments
 (0)