Skip to content

Commit

Permalink
[SPARK-33440][CORE] Use current timestamp with warning log in HadoopF…
Browse files Browse the repository at this point in the history
…SDelegationTokenProvider when the issue date for token is not set up properly

### What changes were proposed in this pull request?

This PR proposes to use current timestamp with warning log when the issue date for token is not set up properly. The next section will explain the rationalization with details.

### Why are the changes needed?

Unfortunately not every implementations respect the `issue date` in `AbstractDelegationTokenIdentifier`, which Spark relies on while calculating. The default value of issue date is 0L, which is far from actual issue date, breaking logic on calculating next renewal date under some circumstance, leading to 0 interval (immediate) on rescheduling token renewal.

In HadoopFSDelegationTokenProvider, Spark calculates token renewal interval as below:

https://github.com/apache/spark/blob/2c64b731ae6a976b0d75a95901db849b4a0e2393/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala#L123-L134

The interval is calculated as `token.renew() - identifier.getIssueDate`, which is providing correct interval assuming both `token.renew()` and `identifier.getIssueDate` produce correct value, but it's going to be weird when `identifier.getIssueDate` provides 0L (default value), like below:

```
20/10/13 06:34:19 INFO security.HadoopFSDelegationTokenProvider: Renewal interval is 1603175657000 for token S3ADelegationToken/IDBroker
20/10/13 06:34:19 INFO security.HadoopFSDelegationTokenProvider: Renewal interval is 86400048 for token HDFS_DELEGATION_TOKEN
```

Hopefully we pick the minimum value as safety guard (so in this case, `86400048` is being picked up), but the safety guard leads unintentional bad impact on this case.

https://github.com/apache/spark/blob/2c64b731ae6a976b0d75a95901db849b4a0e2393/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala#L58-L71

Spark leverages the interval being calculated in above, "minimum" value of intervals, and blindly adds the value to token's issue date to calculates the next renewal date for the token, and picks "minimum" value again. In problematic case, the value would be `86400048` (86400048 + 0) which is quite smaller than current timestamp.

https://github.com/apache/spark/blob/2c64b731ae6a976b0d75a95901db849b4a0e2393/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala#L228-L234

The next renewal date is subtracted with current timestamp again to get the interval, and multiplexed by configured ratio to produce the final schedule interval. In problematic case, this value goes to negative.

https://github.com/apache/spark/blob/2c64b731ae6a976b0d75a95901db849b4a0e2393/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala#L180-L188

There's a safety guard to not allow negative value, but that's simply 0 meaning schedule immediately. This triggers next calculation of next renewal date to calculate the schedule interval, lead to the same behavior, hence updating delegation token immediately and continuously.

As we fetch token just before the calculation happens, the actual issue date is likely slightly before, hence it's not that dangerous to use current timestamp as issue date for the token the issue date has not been set up properly. Still, it's better not to leave the token implementation as it is, so we log warn message to let end users consult with token implementer.

### Does this PR introduce _any_ user-facing change?

Yes. End users won't encounter the tight loop of schedule of token renewal after the PR. In end users' perspective of reflection, there's nothing end users need to change.

### How was this patch tested?

Manually tested with problematic environment.

Closes apache#30366 from HeartSaVioR/SPARK-33440.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
  • Loading branch information
HeartSaVioR committed Nov 30, 2020
1 parent c699435 commit f5d2165
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ private[spark] class HadoopDelegationTokenManager(

private def scheduleRenewal(delay: Long): Unit = {
val _delay = math.max(0, delay)
logInfo(s"Scheduling renewal in ${UIUtils.formatDuration(delay)}.")
logInfo(s"Scheduling renewal in ${UIUtils.formatDuration(_delay)}.")

val renewalTask = new Runnable() {
override def run(): Unit = {
Expand Down Expand Up @@ -230,6 +230,8 @@ private[spark] class HadoopDelegationTokenManager(
val now = System.currentTimeMillis
val ratio = sparkConf.get(CREDENTIALS_RENEWAL_INTERVAL_RATIO)
val delay = (ratio * (nextRenewal - now)).toLong
logInfo(s"Calculated delay on renewal is $delay, based on next renewal $nextRenewal " +
s"and the ratio $ratio, and current time $now")
scheduleRenewal(delay)
creds
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ private[deploy] class HadoopFSDelegationTokenProvider
val identifier = token
.decodeIdentifier()
.asInstanceOf[AbstractDelegationTokenIdentifier]
identifier.getIssueDate + interval
val tokenKind = token.getKind.toString
getIssueDate(tokenKind, identifier) + interval
}
if (nextRenewalDates.isEmpty) None else Some(nextRenewalDates.min)
}
Expand Down Expand Up @@ -126,13 +127,33 @@ private[deploy] class HadoopFSDelegationTokenProvider
Try {
val newExpiration = token.renew(hadoopConf)
val identifier = token.decodeIdentifier().asInstanceOf[AbstractDelegationTokenIdentifier]
val interval = newExpiration - identifier.getIssueDate
logInfo(s"Renewal interval is $interval for token ${token.getKind.toString}")
val tokenKind = token.getKind.toString
val interval = newExpiration - getIssueDate(tokenKind, identifier)
logInfo(s"Renewal interval is $interval for token $tokenKind")
interval
}.toOption
}
if (renewIntervals.isEmpty) None else Some(renewIntervals.min)
}

private def getIssueDate(kind: String, identifier: AbstractDelegationTokenIdentifier): Long = {
val now = System.currentTimeMillis()
val issueDate = identifier.getIssueDate
if (issueDate > now) {
logWarning(s"Token $kind has set up issue date later than current time. (provided: " +
s"$issueDate / current timestamp: $now) Please make sure clocks are in sync between " +
"machines. If the issue is not a clock mismatch, consult token implementor to check " +
"whether issue date is valid.")
issueDate
} else if (issueDate > 0L) {
issueDate
} else {
logWarning(s"Token $kind has not set up issue date properly. (provided: $issueDate) " +
s"Using current timestamp ($now) as issue date instead. Consult token implementor to fix " +
"the behavior.")
now
}
}
}

private[deploy] object HadoopFSDelegationTokenProvider {
Expand Down

0 comments on commit f5d2165

Please sign in to comment.